之前已经有小伙伴分享过天勤替换米筐接口的办法,由于他只是使用了一个最近数据接口导致无法下载更久的数据来做回测。我的实现是可以完整地使用他来下载过去的历史数据,暂时还没有实现tick数据的处理
import pandas as pd
from tqsdk import TqApi
from datetime import timedelta
from typing import List, Optional
import os
from tqsdk.auth import TqAuth
from tqsdk.tools import DataDownloader
from .setting import SETTINGS
from .constant import Exchange, Interval
from .object import BarData, HistoryRequest
# 时间戳对齐
TIME_GAP = 8 * 60 * 60 * 1000000000
INTERVAL_VT2TQ = {
Interval.TICK: 0,
Interval.MINUTE: 60,
Interval.HOUR: 60 * 60,
Interval.DAILY: 60 * 60 * 24,
}
class TianqinClient:
"""
Client for querying history data from Tianqin.
"""
def __init__(self):
""""""
self.inited: bool = False
self.symbols: set = set()
self.api = None
self.username: str = SETTINGS["tqdata.username"]
self.password: str = SETTINGS["tqdata.password"]
def init(self) -> bool:
""""""
if self.inited:
return True
if not self.username or not self.password:
return False
try:
self.api = TqApi(auth=TqAuth(self.username, self.password))
# 获得全部合约
self.symbols = [k for k, v in self.api._data["quotes"].items()]
except:
return False
self.inited = True
return True
def to_tq_symbol(self, symbol: str, exchange: Exchange) -> str:
"""
TQSdk exchange first
"""
for count, word in enumerate(symbol):
if word.isdigit():
break
# Check for index symbol
time_str = symbol[count:]
if "88" in time_str:
return f"KQ.m@{exchange.value}.{symbol[:count]}"
if "99" in time_str:
return f"KQ.i@{exchange.value}.{symbol[:count]}"
return f"{exchange.value}.{symbol}"
def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data from TqSdk.
"""
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
tq_symbol = self.to_tq_symbol(symbol, exchange)
if tq_symbol not in self.symbols:
return None
tq_interval = INTERVAL_VT2TQ.get(interval)
if not tq_interval:
return None
# For querying night trading period data
end += timedelta(1)
total_num = int((end - start).total_seconds() / tq_interval)
if total_num > 8964:
df = self.download_history(start, end, tq_symbol, tq_interval)
else:
# 只能用来补充最新的数据,无法指定日期
df = self.api.get_kline_serial(tq_symbol, tq_interval, 8000).sort_values(by=["datetime"])
# 时间戳对齐
df["datetime"] = pd.DatetimeIndex(pd.to_datetime(df["datetime"] + TIME_GAP)).tz_localize('UTC').tz_convert('Asia/Shanghai')
# 过滤开始结束时间
df = df[(df['datetime'] >= start - timedelta(days=1)) & (df['datetime'] < end)]
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row["datetime"].to_pydatetime(),
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("open_oi", 0),
gateway_name="TQ",
)
data.append(bar)
return data
def download_history(self, start, end, symbol, interval):
"""
下载CSV文件回来,并转换成dataframe
"""
csv_file = "tqdata.csv"
status = DataDownloader(api=self.api, symbol_list=[symbol], start_dt=start, end_dt=end, dur_sec=interval, csv_file_name=csv_file)
while not status.is_finished():
self.api.wait_update()
if os.path.exists(csv_file):
df = pd.read_csv(csv_file)
df["datetime"] = pd.to_datetime(df["datetime"])
if interval > 0:
df = df.rename(columns={f"{symbol}.open": "open"})
df = df.rename(columns={f"{symbol}.high": "high"})
df = df.rename(columns={f"{symbol}.low": "low"})
df = df.rename(columns={f"{symbol}.close": "close"})
df = df.rename(columns={f"{symbol}.volume": "volume"})
df = df.rename(columns={f"{symbol}.open_oi": "open_oi"})
os.unlink(csv_file)
return df
else:
return None
tqdata_client = TianqinClient()
回测以及CTA引擎,我都加了处理,可以在setting上面配置完成就直接使用。由于改的代码有点多,我懒得一一地将相关的代码放到这边。
有兴趣的可以到我的Github那边去看一下,我具体改了什么咯。
Traceback (most recent call last):
File "D:\project\vnpy2\vnpy\app\cta_backtester\ui\widget.py", line 303, in process_backtesting_finished_event
self.chart.set_data(df)
File "D:\project\vnpy2\vnpy\app\cta_backtester\ui\widget.py", line 831, in set_data
hist, x = np.histogram(df["net_pnl"], bins="auto")
File "<__array_function__ internals>", line 6, in histogram
File "C:\vnstudio\lib\site-packages\numpy\lib\histograms.py", line 792, in histogram
bin_edges, uniform_bins = _get_bin_edges(a, bins, range, weights)
File "C:\vnstudio\lib\site-packages\numpy\lib\histograms.py", line 448, in _get_bin_edges
endpoint=True, dtype=bin_type)
File "<__array_function__ internals>", line 6, in linspace
File "C:\vnstudio\lib\site-packages\numpy\core\function_base.py", line 128, in linspace
y = _nx.arange(0, num, dtype=dt).reshape((-1,) + (1,) * ndim(delta))
MemoryError: Unable to allocate 396. PiB for an array with shape (55718376022934928,) and data type float64
回测的时候,有一些策略交易量并不大,不知道怎样会引发这样的报错。就是右下角盈亏分布那个直方图那里报错。有哪个大神可以解答一下不?我暂时的办法是先将他报错的时候不显示咯。