原因见这个帖子:https://www.vnpy.com/forum/topic/4461-shuo-shi-r-breakerce-lue-de-wen-ti
内容如下:
"""
本文件主要实现合约的交易时间段
作者:hxxjava
日期:2020-8-1
"""
from typing import Callable,List,Dict, Tuple, Union
from enum import Enum
import datetime
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.constant import Interval
from rqdatac.utils import to_date
import rqdatac as rq
def get_listed_date(symbol:str):
'''
获得上市日期
'''
info = rq.instruments(symbol)
return to_date(info.listed_date)
def get_de_listed_date(symbol:str):
'''
获得交割日期
'''
info = rq.instruments(symbol)
return to_date(info.de_listed_date)
class Timeunit(Enum):
"""
时间单位
"""
SECOND = '1s'
MINUTE = '1m'
HOUR = '1h'
class TradeHours(object):
""" 合约交易时间段 """
def __init__(self,symbol:str):
self.symbol = symbol.upper()
self.init()
def init(self):
"""
初始化交易日字典及交易时间段数据列表
"""
self.listed_date = get_listed_date(self.symbol)
self.de_listed_date = get_de_listed_date(self.symbol)
self.trade_date_index = {} # 合约的交易日索引字典
self.trade_index_date = {} # 交易天数与交易日字典
trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
days = 0
for td in trade_dates:
self.trade_date_index[td] = days
self.trade_index_date[days] = td
days += 1
trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')
self.time_dn_pairs = self._get_trading_times_dn(trading_hours)
trading_hours0 = [(CHINA_TZ.localize(start),CHINA_TZ.localize(stop)) for start,stop in trading_hours]
self.trade_date_index[self.listed_date] = (0,trading_hours0)
for day in range(1,days):
td = self.trade_index_date[day]
trade_datetimes = []
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
#start:开始时间,dn1:相对交易日前推天数,
#stop :开始时间,dn2:相对开始时间后推天数
d = self.trade_index_date[day+dn1]
start_dt = CHINA_TZ.localize(datetime.datetime.combine(d,start))
stop_dt = CHINA_TZ.localize(datetime.datetime.combine(d,stop))
trade_datetimes.append((start_dt,stop_dt+datetime.timedelta(days=dn2)))
self.trade_date_index[td] = (day,trade_datetimes)
def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]):
"""
交易时间跨天处理,不推荐外部使用 。
产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
其中:
startN:开始时间,dn1N:相对交易日前推天数,
stopN:开始时间,dn2N:相对开始时间后推天数
"""
ilen = len(trading_hours)
if ilen == 0:
return []
start_stops = []
for start,stop in trading_hours:
start_stops.insert(0,(start.time(),stop.time()))
pre_start,pre_stop = start_stops[0]
dn1 = 0
dn2 = 1 if pre_start > pre_stop else 0
time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
for start,stop in start_stops[1:]:
if start > pre_start:
dn1 -= 1
dn2 = 1 if start > stop else 0
time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
pre_start,pre_stop = start,stop
return time_dn_pairs
def get_date_tradetimes(self,date:datetime.date):
"""
得到合约date日期的交易时间段
"""
idx,trade_times = self.trade_date_index.get(date,(None,[]))
return idx,trade_times
def get_trade_datetimes(self,dt:datetime,allday:bool=False):
"""
得到合约date日期的交易时间段
"""
# 得到最早的交易时间
idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
start0,stop0 = trade_times0[0]
if dt < start0:
return None,[]
# 首先找到dt日期自上市以来的交易天数
date,dn = dt.date(),0
days = None
while date < self.de_listed_date:
days,ths = self.trade_date_index.get(date,(None,[]))
if not days:
dn += 1
date = (dt+datetime.timedelta(days=dn)).date()
else:
break
# 如果超出交割日也没有找到,那这就不是一个有效的交易时间
if days is None:
return (None,[])
index_3 = [days,days+1,days-1] # 前后三天的
date_3d = []
for day in index_3:
date = self.trade_index_date.get(day,None)
date_3d.append(date)
# print(date_3d)
for date in date_3d:
if not date:
# print(f"{date} is not trade date")
continue
idx,trade_dts = self.get_date_tradetimes(date)
# print(f"{date} tradetimes {trade_dts}")
ilen = len(trade_dts)
if ilen > 0:
start0,stop = trade_dts[0] # start0 是date交易日的开始时间
start,stop0 = trade_dts[-1]
if dt<start0 or dt>stop0:
continue
for start,stop in trade_dts:
if dt>=start and dt < stop:
if allday:
return idx,trade_dts
else:
return idx,[(start,stop)]
return None,[]
def get_trade_time_perday(self):
"""
计算每日的交易总时长(单位:分钟)
"""
TTPD = datetime.timedelta(0,0,0)
datetimes = []
today = datetime.datetime.now().date()
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
start_dt = CHINA_TZ.localize(datetime.datetime.combine(today,start)) + datetime.timedelta(days=dn1)
stop_dt = CHINA_TZ.localize(datetime.datetime.combine(today,stop)) + datetime.timedelta(days=dn2)
time_delta = stop_dt - start_dt
TTPD = TTPD + time_delta
return int(TTPD.seconds/60)
def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
"""
计算dt在交易日内的分钟数
unit: '1s':second;'1m':minute;'1h';1h
"""
TTID = datetime.timedelta(0,0,0)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
return None
for start,stop in trade_times:
if dt > stop:
time_delta = stop - start
TTID += time_delta
elif dt > start:
time_delta = dt - start
TTID += time_delta
break
else:
break
if unit == Timeunit.SECOND:
return TTID.seconds
elif unit == Timeunit.MINUTE:
return int(TTID.seconds/60)
elif unit == Timeunit.HOUR:
return int(TTID.seconds/3600)
else:
return TTID
def get_day_tradetimes(self,dt:datetime):
"""
得到合约日盘的交易时间段
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
trade_times1 = []
if trade_times:
for start_dt,stop_dt in trade_times:
if start_dt.time() < datetime.time(18,0,0):
trade_times1.append((start_dt,stop_dt))
return index,trade_times1
return (index,trade_times1)
def get_night_tradetimes(self,dt:datetime):
"""
得到合约夜盘的交易时间段
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
trade_times1 = []
if trade_times:
for start_dt,stop_dt in trade_times:
if start_dt.time() > datetime.time(18,0,0):
trade_times1.append((start_dt,stop_dt))
return index,trade_times1
return (index,trade_times1)
def convet_to_datetime(self,day:int,minutes:int):
"""
计算minutes在第day交易日内的datetime形式的时间
"""
date = self.trade_index_date.get(day,None)
if date is None:
return None
idx,trade_times = self.trade_date_index.get(date,(None,[]))
if not trade_times: # 不一定必要
return None
for (start,stop) in trade_times:
timedelta = stop - start
if minutes < int(timedelta.seconds/60):
return start + datetime.timedelta(minutes=minutes)
else:
minutes -= int(timedelta.seconds/60)
return None
def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
"""
计算dt所在K线的起止时间
"""
bar_windows = (None,None)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
# print(f"day={day} trade_times={trade_times}")
return bar_windows
# 求每个交易日的交易时间分钟数
TTPD = self.get_trade_time_perday()
# 求dt在交易日内的分钟数
TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)
# 得到dt时刻K线的起止时间
total_minites = day*TTPD + TTID
# 计算K线宽度(分钟数)
if interval == Interval.MINUTE:
bar_width = window
elif interval == Interval.HOUR:
bar_width = 60*window
elif interval == Interval.DAILY:
bar_width = TTPD*window
elif interval == Interval.WEEKLY:
bar_width = TTPD*window*5
else:
return bar_windows
# 求K线的开始时间的和结束的分钟形式
start_m = int(total_minites/bar_width)*bar_width
stop_m = start_m + bar_width
# 计算K开始时间的datetime形式
start_d = int(start_m / TTPD)
minites = start_m % TTPD
start_dt = self.convet_to_datetime(start_d,minites)
# print(f"start_d={start_d} minites={minites}---->{start_dt}")
# 计算K结束时间的datetime形式
stop_d = int(stop_m / TTPD)
minites = stop_m % TTPD
stop_dt = self.convet_to_datetime(stop_d,minites)
# print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")
return start_dt,stop_dt
def get_date_start_stop(self,dt:datetime):
"""
获得dt所在交易日的开始和停止时间
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
def get_day_start_stop(self,dt:datetime):
"""
获得dt所在交易日日盘的开始和停止时间
"""
index,trade_times = self.get_day_tradetimes(dt)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
def get_night_start_stop(self,dt:datetime):
"""
获得dt所在交易日夜盘的开始和停止时间
"""
index,trade_times = self.get_night_tradetimes(dt)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
if __name__ == "__main__":
rq.init('xxxxx','******',("rqdatad-pro.ricequant.com",16011))
# vt_symbols = ["rb2010.SHFE","ag2012.SHFE","i2010.DCE"]
vt_symbols = ["ag2012.SHFE"]
date0 = datetime.date(2020,8,31)
dt0 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
for vt_symbol in vt_symbols:
symbol,exchange = extract_vt_symbol(vt_symbol)
th = TradeHours(symbol)
# trade_hours = th.get_date_tradetimes(date0)
# print(f"\n{vt_symbol} {date0} trade_hours={trade_hours}")
days,trade_hours = th.get_trade_datetimes(dt0,allday=True)
print(f"\n{vt_symbol} {dt0} days:{days} trade_hours={trade_hours}")
if trade_hours:
day_start = trade_hours[0][0]
day_end = trade_hours[-1][1]
print(f"day_start={day_start} day_end={day_end}")
exit_time = day_end + datetime.timedelta(minutes=-5)
print(f"exit_time={exit_time}")
dt1 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
dt2 = CHINA_TZ.localize(datetime.datetime(2020,9,1,1,1,15))
for dt in [dt1,dt2]:
in_trade,(start,stop) = th.get_date_start_stop(dt)
if (in_trade):
print(f"\n{vt_symbol} 时间 {dt} 交易日起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非交易时间")
in_day,(start,stop) = th.get_day_start_stop(dt)
if (in_day):
print(f"\n{vt_symbol} 时间 {dt} 日盘起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非日盘时间")
in_night,(start,stop) = th.get_night_start_stop(dt)
if in_night:
print(f"\n{vt_symbol} 时间 {dt} 夜盘起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非夜盘时间")
代码如下:
from datetime import datetime,time,timedelta
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager
)
from vnpy.trader.utility import extract_vt_symbol
from vnpy.usertools.trade_hour import TradeHours
class RBreakStrategy2(CtaTemplate):
""""""
author = "KeKe"
setup_coef = 0.25
break_coef = 0.2
enter_coef_1 = 1.07
enter_coef_2 = 0.07
fixed_size = 1
donchian_window = 30
trailing_long = 0.4
trailing_short = 0.4
multiplier = 3
buy_break = 0 # 突破买入价
sell_setup = 0 # 观察卖出价
sell_enter = 0 # 反转卖出价
buy_enter = 0 # 反转买入价
buy_setup = 0 # 观察买入价
sell_break = 0 # 突破卖出价
intra_trade_high = 0
intra_trade_low = 0
day_high = 0
day_open = 0
day_close = 0
day_low = 0
tend_high = 0
tend_low = 0
parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(RBreakStrategy2, self).__init__(
cta_engine, strategy_name, vt_symbol, setting
)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
self.bars = []
symbol,exchange = vt_symbol.split('.')
self.trade_hour = TradeHours(symbol)
self.trade_datetimes = None
self.exit_time = None
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def is_new_day(self,dt:datetime):
"""
判断dt时间是否在当天的交易时间段内
"""
if not self.trade_datetimes:
return True
day_start = self.trade_datetimes[0][0]
day_end = self.trade_datetimes[-1][1]
if day_start<=dt and dt < day_end:
return False
return True
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
# 判断是否是下一交易日
self.new_day = self.is_new_day(bar.datetime)
if self.new_day:
# 计算下一交易日的交易时间段
days,self.trade_datetimes = self.trade_hour.get_trade_datetimes(bar.datetime,allday=True)
# 计算退出时间
# print(f"trade_datetimes={self.trade_datetimes}")
if self.trade_datetimes:
day_end = self.trade_datetimes[-1][1]
self.exit_time = day_end + timedelta(minutes=-5)
if not self.trade_datetimes:
# 不是个有效的K线,不可以处理,
# 为什么会有K线推送?因为非交易时段接口的行为是不可理喻的
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if self.new_day: # 如果是新交易日
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(DynaRBreakStrategy, self).__init__(
cta_engine, strategy_name, vt_symbol, setting
)
self.bg = BarGenerator(self.on_bar) # 这是1分钟K线生成器
self.am = ArrayManager()
self.bars = []
可以看出它的self.bg是1分钟K线的产生器。
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if last_bar.datetime.date() != bar.datetime.date(): # 这样可能只可以判断夜盘和日盘
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time: # self.exit_time==14:55,----》0:00~14:55是可以下单的
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else: # 14:55平仓,夜盘是不做的吗???
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
# New Day
if last_bar.datetime.date() != bar.datetime.date():
我们知道很多期货品种都可能有夜盘,而且日K线的起始时间是上一交易日的晚上21:00,这就不用多说了。也就是说连续的2个1分钟bar是没有办法判断跨交易日的。
如果这样可能判断错误,那么这几个变量:
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
就有点名不副实了。
目前的这个R-Breaker又不是完全没有夜盘,因为 0:00~14:55 可以下单的,14:55平仓,夜盘是不做的,是有什么说法吗?
要知道象白银之类的合约的夜盘是21:00-2:30,从21:00到次日00:00有3小时之多呢,大概1/3的时间被剔除出可以交易时间,是什么原因呢?
可以考虑合约的交易时间段来判断交易日的开始和结束,陈老师觉得是否有必要呢?这是我的一点点意见,不知道是否正确,望批评指正!
发生条件:登录vnpy后,运行策略,连续2天没有重新启动。
修改vnpy\trader\ui\widget.py中的TradeMonitor:
class TradeMonitor(BaseMonitor):
"""
Monitor for trade data.
"""
event_type = EVENT_TRADE
data_key = "tradeid" # hxxjava chanage
sorting = True
headers: Dict[str, dict] = {
"tradeid": {"display": "成交号 ", "cell": BaseCell, "update": False},
"orderid": {"display": "委托号", "cell": BaseCell, "update": False},
"symbol": {"display": "代码", "cell": BaseCell, "update": False},
"exchange": {"display": "交易所", "cell": EnumCell, "update": False},
"direction": {"display": "方向", "cell": DirectionCell, "update": False},
"offset": {"display": "开平", "cell": EnumCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"datetime": {"display": "时间", "cell": TimeCell, "update": False},
"gateway_name": {"display": "接口", "cell": BaseCell, "update": False},
}
ReqQryInstrument : 请求查询合约,填空可以查询到所有合约。
响应:OnRspQryInstrument
◇ 1.函数原型
virtual int ReqQryInstrument(CThostFtdcQryInstrumentField *pQryInstrument, int nRequestID) = 0;
◇ 2.参数
pQryInstrument:查询合约
struct CThostFtdcQryInstrumentField
{
TThostFtdcInstrumentIDType InstrumentID; ///合约代码
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcExchangeInstIDType ExchangeInstID; ///合约在交易所的代码
TThostFtdcInstrumentIDType ProductID;///产品代码
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
请求查询合约响应,当执行ReqQryInstrument后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrument(CThostFtdcInstrumentField *pInstrument, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrument:
合约
struct CThostFtdcInstrumentField
{
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcInstrumentNameType InstrumentName; ///合约名称
TThostFtdcExchangeInstIDType ExchangeInstID;///合约在交易所的代码
TThostFtdcInstrumentIDType ProductID; ///产品代码
TThostFtdcProductClassType ProductClass; ///产品类型
TThostFtdcYearType DeliveryYear; ///交割年份
TThostFtdcMonthType DeliveryMonth;///交割月
TThostFtdcVolumeType MaxMarketOrderVolume; ///市价单最大下单量
TThostFtdcVolumeType MinMarketOrderVolume;///市价单最小下单量
TThostFtdcVolumeType MaxLimitOrderVolume; ///限价单最大下单量
TThostFtdcVolumeType MinLimitOrderVolume; ///限价单最小下单量
TThostFtdcVolumeMultipleType VolumeMultiple; ///合约数量乘数
TThostFtdcPriceType PriceTick; ///最小变动价位
TThostFtdcDateType CreateDate; ///创建日
TThostFtdcDateType OpenDate; ///上市日
TThostFtdcDateType ExpireDate;///到期日
TThostFtdcDateType StartDelivDate; ///开始交割日
TThostFtdcDateType EndDelivDate; ///结束交割日
TThostFtdcInstLifePhaseType InstLifePhase; ///合约生命周期状态
TThostFtdcBoolType IsTrading;///当前是否交易
TThostFtdcPositionTypeType PositionType; ///持仓类型
TThostFtdcPositionDateTypeType PositionDateType;///持仓日期类型
TThostFtdcRatioType LongMarginRatio;///多头保证金率
TThostFtdcRatioType ShortMarginRatio; ///空头保证金率
TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;///是否使用大额单边保证金算法
TThostFtdcInstrumentIDType UnderlyingInstrID;///基础商品代码
TThostFtdcPriceType StrikePrice;///执行价
TThostFtdcOptionsTypeType OptionsType;///期权类型
TThostFtdcUnderlyingMultipleType UnderlyingMultiple; ///合约基础商品乘数
TThostFtdcCombinationTypeType CombinationType;///组合类型
};
VolumeMultiple:合约乘数(同交易所)
PriceTick:最小变动价位(同交易所)
IsTrading:是否活跃(同交易所)
DeliveryYear:交割年份(同交易所)
DeliveryMonth:交割月(同交易所)
OpenDate:上市日(同交易所)
CreateDate:创建日(同交易所)
ExpireDate:到期日(同交易所)
StartDeliveDate:开始交割日(同交易所)
EndDelivDate:结束交割日(同交易所)
同交易所表示这些字段每天更新自交易所,其余字段为柜台设置值。如果发现有些字段值有误,则以此来判断是交易所问题还是CTP柜台设置问题。
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID; ///错误代码
TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
ReqQryInstrumentMarginRate
请求查询合约保证金率,对应响应OnRspQryInstrumentMarginRate。如果InstrumentID填空,则返回持仓对应的合约保证金率,否则返回相应InstrumentID的保证金率。
目前无法通过一次查询得到所有合约保证金率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentMarginRate(CThostFtdcQryInstrumentMarginRateField *pQryInstrumentMarginRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentMarginRate:
查询合约保证金率
struct CThostFtdcQryInstrumentMarginRateField
{
///经纪公司代码
TThostFtdcBrokerIDType BrokerID;
///投资者代码
TThostFtdcInvestorIDType InvestorID;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///投机套保标志
TThostFtdcHedgeFlagType HedgeFlag;
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///投资单元代码
TThostFtdcInvestUnitIDType InvestUnitID;
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
OnRspQryInstrumentMarginRate
请求查询合约保证金率响应,当执行ReqQryInstrumentMarginRate后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrumentMarginRate(CThostFtdcInstrumentMarginRateField *pInstrumentMarginRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数 ///:
合约保证金率
struct CThostFtdcInstrumentMarginRateField
{
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcInvestorRangeType InvestorRange;///投资者范围
TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
TThostFtdcInvestorIDType InvestorID;///投资者代码
TThostFtdcHedgeFlagType HedgeFlag; ///投机套保标志
TThostFtdcRatioType LongMarginRatioByMoney;///多头保证金率
TThostFtdcMoneyType LongMarginRatioByVolume;///多头保证金费
TThostFtdcRatioType ShortMarginRatioByMoney; ///空头保证金率
TThostFtdcMoneyType ShortMarginRatioByVolume; ///空头保证金费
TThostFtdcBoolType IsRelative;///是否相对交易所收取
TThostFtdcExchangeIDType ExchangeID;///交易所代码
TThostFtdcInvestUnitIDType InvestUnitID; ///投资单元代码
};
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID;///错误代码
TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
ReqQryInstrumentCommissionRate
请求查询合约手续费率,对应响应OnRspQryInstrumentCommissionRate。如果InstrumentID填空,则返回持仓对应的合约手续费率。
目前无法通过一次查询得到所有合约手续费率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentCommissionRate(CThostFtdcQryInstrumentCommissionRateField *pQryInstrumentCommissionRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentCommissionRate:
查询手续费率
struct CThostFtdcQryInstrumentCommissionRateField
{
TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
TThostFtdcInvestorIDType InvestorID;///投资者代码
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcExchangeIDType ExchangeID;///交易所代码
TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};
InstrumentID:返回手续费率对应的合约。
但是如果在柜台没有设置具体合约的手续费率,则默认会返回产品的手续费率,InstrumentID就为对应产品ID。
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
OnRspQryInstrumentCommissionRate
请求查询合约手续费率响应,当执行ReqQryInstrumentCommissionRate后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrumentCommissionRate(CThostFtdcInstrumentCommissionRateField *pInstrumentCommissionRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrumentCommissionRate:合约手续费率
struct CThostFtdcInstrumentCommissionRateField
{
TThostFtdcInstrumentIDType InstrumentID; ///合约代码
TThostFtdcInvestorRangeType InvestorRange; ///投资者范围
TThostFtdcBrokerIDType BrokerID;///经纪公司代码
TThostFtdcInvestorIDType InvestorID; ///投资者代码
TThostFtdcRatioType OpenRatioByMoney; ///开仓手续费率
TThostFtdcRatioType OpenRatioByVolume; ///开仓手续费
TThostFtdcRatioType CloseRatioByMoney;///平仓手续费率
TThostFtdcRatioType CloseRatioByVolume;///平仓手续费
TThostFtdcRatioType CloseTodayRatioByMoney;///平今手续费率
TThostFtdcRatioType CloseTodayRatioByVolume;///平今手续费
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcBizTypeType BizType;///业务类型
TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};
pRspInfo:
响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID; ///错误代码
TThostFtdcErrorMsgType ErrorMsg; ///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
令:
合约查询结果 = C
保证金率查询结果 = M
手续费查询结果 = S
则:
C["VolumeMultiple"]
if M["Is_Relative"] == 1:
多头保证金率 = C["LongMarginRatio"] + M["LongMarginRatioByMoney"]
空头保证金率 = C["ShortMarginRatio"] + M["ShortMarginRatioByMoney"]
else:
多头保证金率 = M["LongMarginRatioByMoney"]
空头保证金率 = M["ShortMarginRatioByMoney"]
if S.open_ratio_bymoney == 0.0:
开仓手续费= [FeeType.LOT,S["OpenRatioByVolume"] ]
平仓手续费= [FeeType.LOT,S["CloseRatioByVolume"] ]
平今手续费= [FeeType.LOT,S["CloseTodayRatioByVolume"] ]
else:
开仓手续费 = [FeeType.RATE,S["OpenRatioByMoney"] ]
平仓手续费 = [FeeType.RATE,S["CloseRatioByMoney"] ]
平今手续费 = [FeeType.RATE,S["CloseTodayRatioByMoney"] ]
在回调函数TdApi.onQryInstrument()得到的返回结果是这样的:
以rb2010为例:
{
'InstrumentID': 'rb2010',
'ExchangeID': 'SHFE',
'InstrumentName': '螺纹钢2010',
'ExchangeInstID': 'rb2010',
'ProductID': 'rb',
'ProductClass': '1',
'DeliveryYear': 2020,
'DeliveryMonth': 10,
'MaxMarketOrderVolume': 30,
'MinMarketOrderVolume': 1,
'MaxLimitOrderVolume': 500,
'MinLimitOrderVolume': 1,
'VolumeMultiple': 10,
'PriceTick': 1.0,
'CreateDate': '20190912',
'OpenDate': '20191016',
'ExpireDate': '20201015',
'StartDelivDate': '20201016',
'EndDelivDate': '20201022',
'InstLifePhase': '1',
'IsTrading': 1,
'PositionType': '2',
'PositionDateType': '1',
'LongMarginRatio': 0.1,
'ShortMarginRatio': 0.1,
'MaxMarginSideAlgorithm': '1',
'UnderlyingInstrID': '',
'StrikePrice': 0.0,
'OptionsType': '\x00',
'UnderlyingMultiple': 0.0,
'CombinationType': '0'
}
在回调函数TdApi.onQryInstrumentMarginRate()中得到返回结果这样是的:
以rb2010为例:
{
'InstrumentID' : rb2010,
'InvestorRange' : 1,
'BrokerID' : 9999,
'InvestorID' : 147102,
'HedgeFlag' : 1,
'LongMarginRatioByMoney' : 0.1,
'LongMarginRatioByVolume' : 0.0,
'ShortMarginRatioByMoney' : 0.1,
'ShortMarginRatioByVolume' : 0.0,
'IsRelative' : 0,
'ExchangeID' : ,
'InvestUnitID' :
}
TdApi.reqQryInstrument()命令的返回中包括:
TdApi.reqQryInstrumentMarginRate()命令的返回包括:
1 这两个命令的返回值都包含中合约的保证金率,哪个是期货公司的实收的保证金率 ?
2 两条都是TdApi的命令,都是连接开户的期货公司的交易服务器的,还有必要执行TdApi.reqQryInstrumentMarginRate()专门获取吗 ?
class BarGenerator:
......
def __init__(self,on_bar: Callable,window: int = 0,on_window_bar: Callable = None,interval: Interval = Interval.MINUTE ):
... ...
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
class Interval(Enum):
"""
Interval of bar data.
"""
MINUTE = "1m"
HOUR = "1h"
DAILY = "d"
WEEKLY = "w"
BarGenerator虽然传入的Interval类型,但是它只考虑的Interval.MINUTE和Interval.HOUR两个单位,而它合成N分钟和N小时的K线是没有问题的。
也就是说,你不可以是Interval.DAILY和Interval.WEEKLY做单位,因为它使用的米筐接口的1分钟历史数据,没有使用米筐的1h和1d数据。
1) self.bgm = BarGenerator(self.on_bar, 4, self.on_month_bar,interval=Interval.WEEKLY)
2) self.bgm = BarGenerator(self.on_bar,20, self.on_month_bar,interval=Interval.DAILY)
因为BarGenerator没有考虑 Interval.DAILY和Interval.WEEKLY时间间隔
使用Interval.MINUTE作为参数时,window不可以超过59,它表示合成不了成功1小时的K线,而Interval.HOUR作为参数时,是对1小时K线进行计数,然后把的self.interval_count % self.window作为条件来判断是否查询window小时K线是否结束的,它可以用来表达日线以上周期K线。
所以创建日线以上周期K线,你最大只可以使用Interval.HOUR为单位,而且它又是参考自然时间的生成机制:
举例:
rb2010的交易时间段 :21:00-23:30(4根小时K线)09:00-10:15 10:30-11:30(3根小时K线) 13:30-15:00(2根小时K线),因此需要9根1小时K线合成
ag2012的交易时间段 :21:00-02:30 (6根小时K线)09:00-10:15 10:30-11:30 (3根小时K线)13:30-15:00(2根小时K线),因此需要11根1小时K线合成
IF88 沪深主力连续 交易时间段:09:30-11:30(3根小时K线),13:00-15:00(2根小时K线),每日时长:5小时,因此需要6根1小时K线合成
T2009 10年期国债2009 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成
TS2103 2年期国债2103 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成
如果看不明白上面的叙述,就静下心来慢慢想一些吧,想不明白就看看BarGenerator的update_bar()函数代码就明白了。
下面仅以rb2010和ag2012合约为例来说明,其他周期的类似。
rb2010的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 9, self.on_day_bar,interval=Interval.HOUR)
ag2012的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 11, self.on_day_bar,interval=Interval.HOUR)
rb2010的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 45, self.on_week_bar,interval=Interval.HOUR)
ag2012的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 55, self.on_week_bar,interval=Interval.HOUR)
rb2010的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 180, self.on_month_bar,interval=Interval.HOUR)
ag2012的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 220, self.on_month_bar,interval=Interval.HOUR)
这些合成的日K能够保证是日内对齐的,但周K和月K线并不能够保证是周内和月内对齐的,它取决你什么时候启动你的策略。
更好的创建方法先对BarGenerator进行扩展,实现考虑交易时间段的日K、周K的生成机制,当然创建时需要传入交易时间段参数。这里就不在说了,以后可以专门讨论。
VNPY的源代码目录众多,分门别类非常多,许多文件的内容都很类似,甚至函数名称都是一样的,如果不做特别设置,查询代码会把一些你不使用的app、api、gateway下的文件都查询出来,很是烦人,经过本人琢磨,发现这么设置就不错:
随便怎么命名都可以,我就为其取名VNPY,如下图所示:
1 把vnpy的源代码目录加入工作区
2 把自己的策略源代码目录加入工作区
查询代码设置步骤
- 1 点击查找图标;
- 2 输入查找字符串;
- 3 设置需要包含的目录或者文件,只输入文件夹名称会包含下面的文件和子文件夹下的文件。需要输入多个文件夹时,用逗号割开;
- 4 设置需要排除的文件,与步骤3类似,可以使用通配符,如图所示:
经过这样设置以后,当你输入任何要查找字符串的时候,就不会显示那些你不感兴趣的app、gateway下的子目录的文件了。本人目前感兴趣的是trader,event,chart,rpc,app\cta_strategy,app\rpc_service,gateway\ctp,api,user_tools这些子目录中的文件,你也可以根据自己的需要特别设定,以提高代码查询的效率。
进阶课程里看过陈晓优老师讲的很多策略例子,都是讲如何买和如何卖,如何止盈和止损的,又是仿真、优化、实盘的,看到人是心潮澎湃!
于是看是着手仿照例子编写自己的策略,策略经过计算买卖信号后,总是要下单的,那我下多大仓位呢? 回过头这时候参考例子才发现,几乎所有的仓位都是self.fixed_size=1,就没有讲如何动态决定仓位的例子!
于是VNPY的QQ群里问前辈、先知,无人回答,再在论坛里问老师,终于回答了:”不建议动态仓位,这么重要的事情必须交给手工完成!“,这个答复让我有点懵——做量化交易你让为手动决定仓位???
假设是按手续费率(而不是按手收费),那么开仓资金为:
KM = Price*N*Size*Margin (1+ FeeRate)
那么必须符合添加KM < Money,进而推出
N < Money/(Price*Size*Margin (1+ FeeRate))。
当然你不可能满仓干,也许还要一个最大开仓资金比例R,例如:
N = int(Money*R/(Price*Size*Margin (1+ FeeRate)))。
当R=40%表示用你账户里40%的资金,可以动态开仓的手数。这样不就可以动态开仓了吗?
当然实际开仓时的仓位计算可能比这复杂多了,比如你可以考虑交易合约的波动水平,需要考虑投资者愿意承担的风险水平等等,但不管怎么变化,策略动态开仓都必须要有如下这几个参数:
经过艰苦和漫长的代码研读和梳理,发现CTA策略交易中只有pos、trading和inited这些策略成员,没有与资金相关的东西。我们来看看这几个动态下单必须具备的参数是否提供了:
目前VNPY的CTA策略因为缺少上述几个关键参数,无法实现动态仓位交易。是不能也,而非不可以!
作为交易赚钱的CTA策略,怎么可以不与这些资金相关的参数打交道?因人而异的保证金和手续费不应该成为不提供这些参数的理由!
当多个策略在同时运行的时候,你的实际账户权益的消长,到底是哪个策略赚的,哪个策略赔的都无法说清楚,运行3天后就已经是一本糊涂账了,这怎么可以!
虽然有上面的困难,但是办法总比困难多!可以参考文华财经8.3或者库安9.0的办法(熟悉文华财经客户端的人应该都知道),它们的方法是用模组账户的方法来为每个用户模组创建一个虚拟的模组账户,很好地解决用户算法对资金、保证金和手续费等参数的设定!
策略账户的已经基本上实现了,目前只在测试中,且看我一步一步慢慢为大家分享......
有兴趣的可以先看看 策略账户界面展示。
按K线周期长度:
按K线参考起始时间划分:
VNPY对常规周期K线和自定义周期K线的处理是一视同仁的,统一使用BarGenerator()就可以了,它只使用1分钟K线数据,根据需要产生N分钟K线,至于产生出来是常规周期K线,还是自定义周期K线,主要看N是多少了。这样做的好处是自由!缺点是处理复杂些、速度慢些(不过反正不用手工处理,让计算机做,无所谓)。
它是按照自然时间来计算一根K线是否结束的。如对于30分钟K线,rb2010合约在每个交易日都会产生这样的一组K线:
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:30——15分钟(?)
10:30-11:00——30分钟
11:00-11:30——30分钟
13:30-14:00——30分钟
14:00-14:30——30分钟
14:30-15:00——30分钟
目前vnpy的BarGenerator()就是这种K线参考机制。优点点是实现简单,无需考虑交易时段;缺点是明天都会有特别30分钟K是个另类:10:00-10:30的K线,其实只交易了15分钟!
熟悉文华财经8.3的应该熟悉下图的设置:
以合约的日交易时间的起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制”就是这种情况。
如RB2010交易时间段:'21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的30分钟K线分别是
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:45——30分钟
10:45-11:15——30分钟
11:15-11:45——30分钟
13:45-14:15——30分钟
14:15-14:45——30分钟
14:45-15:00——15分钟(?)
这种K线时间参考机制的优点是不会模糊每日开市时的跳空行情,缺点是每日都可能产生一根交易时间不足30分钟的K线。
分别以合约的日盘交易时间起点和夜盘交易时间起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制改进型”就是这种情况。
如RB2010交易时间段:夜盘:'21:00-02:30, 日盘:09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的60分钟K线分别是
夜盘K线:
起止时间 交易时长
21:00-22:00——60分钟
22:00-23:00——60分钟
23:00-00:00——60分钟
00:00-01:00——60分钟
01:00-02:00——60分钟
02:00-02:30——30分钟(?)
日盘K线:
起止时间 交易时长
09:00-10:00——60分钟
10:00-11:15——60分钟
11:15-11:15——60分钟
11:15-14:15——60分钟
14:15-15:00——45分钟(?)
这种K线时间参考机制的优点是不会模糊各种日盘和夜盘开市时的跳空行情,缺点是每日都可能产生交易时长不足60分钟的K线。
以合约上市日的交易时间起点为参考,取出节假日,考虑交易时段,然后以等交易时间宽度产生K线。如FixedBarGenerator对N分钟K线的处理就是这种情况。FixedBarGenerator的原理参见和具体实现代码参见在前面的帖子里面已经描述得比较清楚了,这里不再重复。
参考合约上市日起点、等交易时间的好处是:所有的K线的交易时长都是想等的,它们的成交量是可类比的。因为交易时长不等的K线产生的所谓“地量”或者“天量”,有多大意义是可想而知的。这种K线产生机制的缺点是:可能产生跨交易日,跨日盘和夜盘的K线,这样可能造成观察不到交易日开盘跳空,日盘和夜盘开市时的跳空现象,因为可能那根K线还没有结束。
总有人声称“三行代码搞定国内期货10:15-10:30的K线“,并且上了精华版。三行代码可以搞定?那只是按下葫芦起了瓢的解决方法。
同样是国内期货,IF就没有这样的休市时间段,难倒你都要有这么做?自定义周期7分钟,19分钟K线你有怎么办?还有 再说了,就是改也最好是对BarGenerator扩展定制一个新的K线生成器,二不能直接修改BarGenerator的代码,修改它会导致在不同的合约、某些周期长度上出错,而你却浑然不知!
举例下面这些合约,都是没有10:15-10:30休市时间段的
合约代码:IC2008 名称:中证500指数2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC888 名称:中证主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC99 名称:中证指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2009 名称:中证500指数2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2103 名称:中证500指数2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88A3 名称:中证次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88 名称:中证主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC889 名称:中证主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2012 名称:中证500指数2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88A2 名称:中证次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF888 名称:沪深主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2009 名称:IF2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2103 名称:IF2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2012 名称:IF2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF889 名称:沪深主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2008 名称:IF2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88A2 名称:沪深次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88 名称:沪深主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88A3 名称:沪深次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF99 名称:沪深指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2103 名称:上证50指数2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88 名称:上证主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH888 名称:上证主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2012 名称:上证50指数2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2009 名称:上证50指数2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2008 名称:上证50指数2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88A3 名称:上证次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88A2 名称:上证次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH889 名称:上证主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH99 名称:上证指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:T2009 名称:10年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:T888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:T889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:T99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:T88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:T2012 名称:10年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:T2103 名称:10年期国债2103 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2009 名称:5年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2012 名称:5年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2103 名称:5年期国债2103 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2012 名称:2年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2009 名称:2年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2103 名称:2年期国债2103 交易时间段:09:31-11:30,13:01-15:15
鱼和熊掌不可兼得,每种K线产生机制都有优点和缺点,至于您选择哪一种来产生你的K线,就看你的取舍了。明白与不明白个中的道理却是大相径庭的,所谓胸有成竹,才能够临危不乱。谨以本贴分享本人的一点浅薄之见,希望能够帮助到您!
CTA策略的交易活动可以使用下面这些来描述:
当K线周期为1,5,15分钟或者1日,1周时,它是没有问题的
当K线周期为10,20,30分钟或者一些自定义分钟周期如7分钟,还有1、2,4小时,由于合约的交易时段的不规则,导致某些K线的周期于其时间发生的交易时间不想等。
你开启CTA策略的时机是随机的,这导致self.laod_bar(20)的执行也是随机的,它从米筐或者数据库加载的1分钟数据也是随机,最终导致你所产生的上述K线也是随机的。那你已经这样的K线数据计算出来的指标在某种程度上可能也随机的。
不赘述原因了,举例吧:
RB2010交易时间段:'21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
假如你的K线每天从21:01开始计算K线,
如果K线的周期为10分钟,那么在10:10-10:20的那个K线其实只交易了5分钟
如果K线的周期为20分钟,那么在10:00-10:20的那个K线其实只交易了15分钟,在10:20-10:40的那个K线其实只交易了10分钟
如果K线的周期为30分钟,那么在10:00-10:30的那个K线其实只交易了15分钟
如果K线的周期为60分钟,那么在10:00-11:00的那个K线其实只交易了45分钟
如果K线的周期为120分钟,那么在10:00-14:00的那个K线其实只交易了105分钟
如果BarGenerator采用等交易时长产生K线,策略初始化时通过load_bar(n),读取1分钟历史K线,目前BarGenerator时是从n日之前的第一个1分钟K线区合约其他周期的K线的。
这导某些周期K线随n值不同,K线的起止时间会变化。而如果采用从上市日期开始计算等交易时长的K线位置,则无论何时初始化策略,K线的起止时间都是一样的。
1 等自然时长K线——无需考虑交易时段
2 等交易时长K线——需要考虑交易时段
3 从上市日起算K线——起止位置固定
4 从策略初始化时起算K线——起止位置不固定
我只是拿DemoStrategy的例子来说明问题。
陈老师在CTA策略实战进阶中的课程里讲的DemoStrategy策略:
class DemoStrategy(CtaTemplate):
""" 一个演示策略 """
author = "hxx"
fast_window = 10
slow_window = 20
fast_ma0 = 0
fast_ma1 = 0
slow_ma0 = 0
slow_ma1 = 0
parameters = [
"fast_window",
"slow_window"
]
variables = [
"fast_ma0",
"fast_ma1",
"slow_ma0",
"slow_ma1",
]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict
):
"""构造函数"""
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = NewBarGenerator(
on_bar=self.on_bar,
window=7,
on_window_bar=on_7min_bar,
interval=Interval.Minute)
self.am = NewArrayManager()
def on_init(self):
""""""
self.write_log("策略初始化")
self.load_bar(10) # 加载10日的7分钟K线
def on_start(self):
"""策略启动"""
self.write_log("策略启动")
def on_stop(self):
""" 策略停止 """
self.write_log(" 策略停止 ")
def on_tick(self,tick:TickData):
""" Tick更新 """
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""K线更新"""
self.bg.update_bar(bar)
def on_7min_bar(self, bar: BarData):
"""K线更新"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
""" 定义金叉和死叉 """
cross_over = (self.fast_ma0>= self.fast_ma1 and
self.slow_ma0<self.slow_ma1)
cross_below = (self.slow_ma0>self.slow_ma1 and
self.slow_ma0<=self.slow_ma1)
if cross_over:
price = bar.close_price + 5
if not self.pos:
self.buy(price,1)
elif self.pos < 0:
self.cover(price,1)
self.buy(price,1)
elif cross_below:
price = bar.close_price - 5
if not self.pos:
self.short(price,1)
elif self.pos>0:
self.sell(price,1)
self.short(price,1)
# 更新图形界面
self.put_event()
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
CtaEngine的load_bar()的代码是这样的:
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
for bar in bars:
callback(bar)
CtaEngine.load_bar()对历史数据的处理逻辑是:
1 首先从rqdatac获取历史分钟数据,利用DemoStrategyd策略的on_bar()合成出7分钟K线,然后调用on_7min_bar()。这样self.am是一个默认缓冲为100个FIFO数组队列,利用它执行策略的买卖信号计算和下单。它的end是当前时间,start是10天前的时间。策略初始化后,从行情接口可以不断地收到tick数据,然后执行DemoStrategyd策略on_tick()-->on_bar()-->on_7min_bar()
2 如果rqdatac没有读取到历史分钟数据,那么就从本地数据库中读取。问题来了:
1) 如果本地数据库中有start-end之间的数据,可是最后的数据时间与当前时间有空档,例如最后保存的有30分钟空档,而初始化后行情接口虽然也是按照on_tick()-->on_bar()-->on_7min_bar()的过程不断地更新self.am,可是我们知道它所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
2) 如果本地数据库中有start-end之间的数据,可是这些数据本身内部可能好几段历史返分钟数据,本身中间就有空档,导致self.am所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
3) 本地数据可能是从rqdatac读取保存的,有谁的动作那么快,这边保存了历史数据,那边立刻启动策略,立刻初始化?所以当rqdatac没有历史数据,转而从本地数据库读取,这本身就会造成历史K线与新合成处理的数据空档。
理由如下:
1 实盘策略不要从数据库读取数据的选择,因为容易造成无法补救的K线数据空档;
2 它发生在rqdatac无法读取的情况下,自然无法再从rqdata补齐数据;
3 而行情接口通常不提供历史分钟K线数据功能;
4 可以没有数据,没有当然不会计算买卖点,也就不会交易。
5 如果简单使用有空档K线数据,发出了错误的买卖信号,进行了错误的交易,这是不应该的!
上一主题,“为K线图表添砖加瓦——让CTA策略的运行看得见”,可以说是失败了,原因已经找到,是因为掉到类变量和实例变量的坑里了。具体过程参加这个主题:
https://www.vnpy.com/forum/topic/3860-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian
1 解决了不同合约同时运行_kx_strategy策略时,K线图会互相影响的问题;
2 去掉策略管理器中的“K线图表”按钮,保持与原来的界面一致,在_kx_strategy策略中增加一个show_chart参数项目,如果想显示K线图,为它配置为True,否则不会显示K线图;
3 增加策略被移除时,删除该策略的K线图表功能
4 K线图表中的显示内容在_kx_strategy策略中配置,而不是一个固定的主图和附图搭配。参照我的init_kx_chart()方法,您也可以为自己的策略配置自己的K线主图和附图指标;
5 添加最后一根了临时K线的显示
vnpy\app\cta_strategy\base.py
vnpy\app\cta_strategy\engine.py
vnpy\app\cta_strategy\ui\widget.py
vnpy\app\cta_backtester\engine.py
"""
Defines constants and objects used in CtaStrategy App.
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import timedelta
from vnpy.trader.constant import Direction, Offset, Interval
APP_NAME = "CtaStrategy"
STOPORDER_PREFIX = "STOP"
class StopOrderStatus(Enum):
WAITING = "等待中"
CANCELLED = "已撤销"
TRIGGERED = "已触发"
class EngineType(Enum):
LIVE = "实盘"
BACKTESTING = "回测"
class BacktestingMode(Enum):
BAR = 1
TICK = 2
@dataclass
class StopOrder:
vt_symbol: str
direction: Direction
offset: Offset
price: float
volume: float
stop_orderid: str
strategy_name: str
lock: bool = False
vt_orderids: list = field(default_factory=list)
status: StopOrderStatus = StopOrderStatus.WAITING
EVENT_CTA_LOG = "eCtaLog"
EVENT_CTA_STRATEGY = "eCtaStrategy"
EVENT_CTA_STOPORDER = "eCtaStopOrder"
EVENT_CTA_TICK = "eCtaTick" # hxxjava add
EVENT_CTA_HISTORY_BAR = "eCtaHistoryBar" # hxxjava add
EVENT_CTA_BAR = "eCtaBar" # hxxjava add
EVENT_CTA_ORDER = "eCtaOrder" # hxxjava add
EVENT_CTA_TRADE = "eCtaTrade" # hxxjava add
INTERVAL_DELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
Interval.DAILY: timedelta(days=1),
}
""""""
import importlib
import os
import traceback
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from tzlocal import get_localzone
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
OrderRequest,
SubscribeRequest,
HistoryRequest,
LogData,
TickData,
BarData,
ContractData
)
from vnpy.trader.event import (
EVENT_TICK,
EVENT_ORDER,
EVENT_TRADE,
EVENT_POSITION
)
from vnpy.trader.constant import (
Direction,
OrderType,
Interval,
Exchange,
Offset,
Status
)
from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to
from vnpy.trader.database import database_manager
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.converter import OffsetConverter
from .base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_STRATEGY,
EVENT_CTA_STOPORDER,
EngineType,
StopOrder,
StopOrderStatus,
STOPORDER_PREFIX
)
from .template import CtaTemplate
STOP_STATUS_MAP = {
Status.SUBMITTING: StopOrderStatus.WAITING,
Status.NOTTRADED: StopOrderStatus.WAITING,
Status.PARTTRADED: StopOrderStatus.TRIGGERED,
Status.ALLTRADED: StopOrderStatus.TRIGGERED,
Status.CANCELLED: StopOrderStatus.CANCELLED,
Status.REJECTED: StopOrderStatus.CANCELLED
}
class CtaEngine(BaseEngine):
""""""
engine_type = EngineType.LIVE # live trading engine
setting_filename = "cta_strategy_setting.json"
data_filename = "cta_strategy_data.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(CtaEngine, self).__init__(
main_engine, event_engine, APP_NAME)
self.strategy_setting = {} # strategy_name: dict
self.strategy_data = {} # strategy_name: dict
self.classes = {} # class_name: stategy_class
self.strategies = {} # strategy_name: strategy
self.symbol_strategy_map = defaultdict(
list) # vt_symbol: strategy list
self.orderid_strategy_map = {} # vt_orderid: strategy
self.strategy_orderid_map = defaultdict(
set) # strategy_name: orderid list
self.stop_order_count = 0 # for generating stop_orderid
self.stop_orders = {} # stop_orderid: stop_order
self.init_executor = ThreadPoolExecutor(max_workers=1)
self.rq_client = None
self.rq_symbols = set()
self.vt_tradeids = set() # for filtering duplicate trade
self.offset_converter = OffsetConverter(self.main_engine)
def init_engine(self):
"""
"""
self.init_rqdata()
self.load_strategy_class()
self.load_strategy_setting()
self.load_strategy_data()
self.register_event()
self.write_log("CTA策略引擎初始化成功")
def close(self):
""""""
self.stop_all_strategies()
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
def init_rqdata(self):
"""
Init RQData client.
"""
result = rqdata_client.init()
if result:
self.write_log("RQData数据接口初始化成功")
def query_bar_from_rq(
self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
def process_tick_event(self, event: Event):
""""""
tick = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
self.check_stop_order(tick)
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, tick)
def process_order_event(self, event: Event):
""""""
order = event.data
self.offset_converter.update_order(order)
strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
if not strategy:
return
# Remove vt_orderid if order is no longer active.
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if order.vt_orderid in vt_orderids and not order.is_active():
vt_orderids.remove(order.vt_orderid)
# For server stop order, call strategy on_stop_order function
if order.type == OrderType.STOP:
so = StopOrder(
vt_symbol=order.vt_symbol,
direction=order.direction,
offset=order.offset,
price=order.price,
volume=order.volume,
stop_orderid=order.vt_orderid,
strategy_name=strategy.strategy_name,
status=STOP_STATUS_MAP[order.status],
vt_orderids=[order.vt_orderid],
)
self.call_strategy_func(strategy, strategy.on_stop_order, so)
# Call strategy on_order function
self.call_strategy_func(strategy, strategy.on_order, order)
def process_trade_event(self, event: Event):
""""""
trade = event.data
# Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids:
return
self.vt_tradeids.add(trade.vt_tradeid)
self.offset_converter.update_trade(trade)
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
if not strategy:
return
# Update strategy pos before calling on_trade method
if trade.direction == Direction.LONG:
strategy.pos += trade.volume
else:
strategy.pos -= trade.volume
self.call_strategy_func(strategy, strategy.on_trade, trade)
# Sync strategy variables to data file
self.sync_strategy_data(strategy)
# Update GUI
self.put_strategy_event(strategy)
def process_position_event(self, event: Event):
""""""
position = event.data
self.offset_converter.update_position(position)
def check_stop_order(self, tick: TickData):
""""""
for stop_order in list(self.stop_orders.values()):
if stop_order.vt_symbol != tick.vt_symbol:
continue
long_triggered = (
stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price
)
short_triggered = (
stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price
)
if long_triggered or short_triggered:
strategy = self.strategies[stop_order.strategy_name]
# To get excuted immediately after stop order is
# triggered, use limit price if available, otherwise
# use ask_price_5 or bid_price_5
if stop_order.direction == Direction.LONG:
if tick.limit_up:
price = tick.limit_up
else:
price = tick.ask_price_5
else:
if tick.limit_down:
price = tick.limit_down
else:
price = tick.bid_price_5
contract = self.main_engine.get_contract(stop_order.vt_symbol)
vt_orderids = self.send_limit_order(
strategy,
contract,
stop_order.direction,
stop_order.offset,
price,
stop_order.volume,
stop_order.lock
)
# Update stop order status if placed successfully
if vt_orderids:
# Remove from relation map.
self.stop_orders.pop(stop_order.stop_orderid)
strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if stop_order.stop_orderid in strategy_vt_orderids:
strategy_vt_orderids.remove(stop_order.stop_orderid)
# Change stop order status to cancelled and update to strategy.
stop_order.status = StopOrderStatus.TRIGGERED
stop_order.vt_orderids = vt_orderids
self.call_strategy_func(
strategy, strategy.on_stop_order, stop_order
)
self.put_stop_order_event(stop_order)
def send_server_order(
self,
strategy: CtaTemplate,
contract: ContractData,
direction: Direction,
offset: Offset,
price: float,
volume: float,
type: OrderType,
lock: bool
):
"""
Send a new order to server.
"""
# Create request and send order.
original_req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=direction,
offset=offset,
type=type,
price=price,
volume=volume,
)
# Convert with offset converter
req_list = self.offset_converter.convert_order_request(original_req, lock)
# Send Orders
vt_orderids = []
for req in req_list:
req.reference = strategy.strategy_name # Add strategy name as order reference
vt_orderid = self.main_engine.send_order(
req, contract.gateway_name)
# Check if sending order successful
if not vt_orderid:
continue
vt_orderids.append(vt_orderid)
self.offset_converter.update_order_request(req, vt_orderid)
# Save relationship between orderid and strategy.
self.orderid_strategy_map[vt_orderid] = strategy
self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
return vt_orderids
def send_limit_order(
self,
strategy: CtaTemplate,
contract: ContractData,
direction: Direction,
offset: Offset,
price: float,
volume: float,
lock: bool
):
"""
Send a limit order to server.
"""
return self.send_server_order(
strategy,
contract,
direction,
offset,
price,
volume,
OrderType.LIMIT,
lock
)
def send_server_stop_order(
self,
strategy: CtaTemplate,
contract: ContractData,
direction: Direction,
offset: Offset,
price: float,
volume: float,
lock: bool
):
"""
Send a stop order to server.
Should only be used if stop order supported
on the trading server.
"""
return self.send_server_order(
strategy,
contract,
direction,
offset,
price,
volume,
OrderType.STOP,
lock
)
def send_local_stop_order(
self,
strategy: CtaTemplate,
direction: Direction,
offset: Offset,
price: float,
volume: float,
lock: bool
):
"""
Create a new local stop order.
"""
self.stop_order_count += 1
stop_orderid = f"{STOPORDER_PREFIX}.{self.stop_order_count}"
stop_order = StopOrder(
vt_symbol=strategy.vt_symbol,
direction=direction,
offset=offset,
price=price,
volume=volume,
stop_orderid=stop_orderid,
strategy_name=strategy.strategy_name,
lock=lock
)
self.stop_orders[stop_orderid] = stop_order
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
vt_orderids.add(stop_orderid)
self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
self.put_stop_order_event(stop_order)
return [stop_orderid]
def cancel_server_order(self, strategy: CtaTemplate, vt_orderid: str):
"""
Cancel existing order by vt_orderid.
"""
order = self.main_engine.get_order(vt_orderid)
if not order:
self.write_log(f"撤单失败,找不到委托{vt_orderid}", strategy)
return
req = order.create_cancel_request()
self.main_engine.cancel_order(req, order.gateway_name)
def cancel_local_stop_order(self, strategy: CtaTemplate, stop_orderid: str):
"""
Cancel a local stop order.
"""
stop_order = self.stop_orders.get(stop_orderid, None)
if not stop_order:
return
strategy = self.strategies[stop_order.strategy_name]
# Remove from relation map.
self.stop_orders.pop(stop_orderid)
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if stop_orderid in vt_orderids:
vt_orderids.remove(stop_orderid)
# Change stop order status to cancelled and update to strategy.
stop_order.status = StopOrderStatus.CANCELLED
self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
self.put_stop_order_event(stop_order)
def send_order(
self,
strategy: CtaTemplate,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool,
lock: bool
):
"""
"""
contract = self.main_engine.get_contract(strategy.vt_symbol)
if not contract:
self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
return ""
# Round order price and volume to nearest incremental value
price = round_to(price, contract.pricetick)
volume = round_to(volume, contract.min_volume)
if stop:
if contract.stop_supported:
return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
else:
return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
else:
return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)
def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
"""
"""
if vt_orderid.startswith(STOPORDER_PREFIX):
self.cancel_local_stop_order(strategy, vt_orderid)
else:
self.cancel_server_order(strategy, vt_orderid)
def cancel_all(self, strategy: CtaTemplate):
"""
Cancel all active orders of a strategy.
"""
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if not vt_orderids:
return
for vt_orderid in copy(vt_orderids):
self.cancel_order(strategy, vt_orderid)
def get_engine_type(self):
""""""
return self.engine_type
def get_pricetick(self, strategy: CtaTemplate):
"""
Return contract pricetick data.
"""
contract = self.main_engine.get_contract(strategy.vt_symbol)
if contract:
return contract.pricetick
else:
return None
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
for bar in bars:
callback(bar)
def load_tick(
self,
vt_symbol: str,
days: int,
callback: Callable[[TickData], None]
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now()
start = end - timedelta(days)
ticks = database_manager.load_tick_data(
symbol=symbol,
exchange=exchange,
start=start,
end=end,
)
for tick in ticks:
callback(tick)
def call_strategy_func(
self, strategy: CtaTemplate, func: Callable, params: Any = None
):
"""
Call function of a strategy and catch any exception raised.
"""
try:
if params:
func(params)
else:
func()
except Exception:
strategy.trading = False
strategy.inited = False
msg = f"触发异常已停止\n{traceback.format_exc()}"
self.write_log(msg, strategy)
def add_strategy(
self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
):
"""
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(f"创建策略失败,存在重名{strategy_name}")
return
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(f"创建策略失败,找不到策略类{class_name}")
return
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
self.strategies[strategy_name] = strategy
# Add vt_symbol to strategy map.
strategies = self.symbol_strategy_map[vt_symbol]
strategies.append(strategy)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
def init_strategy(self, strategy_name: str):
"""
Init a strategy.
"""
self.init_executor.submit(self._init_strategy, strategy_name)
def _init_strategy(self, strategy_name: str):
"""
Init strategies in queue.
"""
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
self.write_log(f"{strategy_name}开始执行初始化")
# Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables)
data = self.strategy_data.get(strategy_name, None)
if data:
for name in strategy.variables:
value = data.get(name, None)
if value:
setattr(strategy, name, value)
# Subscribe market data
contract = self.main_engine.get_contract(strategy.vt_symbol)
if contract:
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name)
else:
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
# Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
def start_strategy(self, strategy_name: str):
"""
Start a strategy.
"""
strategy = self.strategies[strategy_name]
if not strategy.inited:
self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
return
if strategy.trading:
self.write_log(f"{strategy_name}已经启动,请勿重复操作")
return
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
def stop_strategy(self, strategy_name: str):
"""
Stop a strategy.
"""
strategy = self.strategies[strategy_name]
if not strategy.trading:
return
# Call on_stop function of the strategy
self.call_strategy_func(strategy, strategy.on_stop)
# Change trading status of strategy to False
strategy.trading = False
# Cancel all orders of the strategy
self.cancel_all(strategy)
# Sync strategy variables to data file
self.sync_strategy_data(strategy)
# Update GUI
self.put_strategy_event(strategy)
def edit_strategy(self, strategy_name: str, setting: dict):
"""
Edit parameters of a strategy.
"""
strategy = self.strategies[strategy_name]
strategy.update_setting(setting)
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
def remove_strategy(self, strategy_name: str):
"""
Remove a strategy.
"""
strategy = self.strategies[strategy_name]
if strategy.trading:
self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
return
# Remove setting
self.remove_strategy_setting(strategy_name)
# Remove from symbol strategy map
strategies = self.symbol_strategy_map[strategy.vt_symbol]
strategies.remove(strategy)
# Remove from active orderid map
if strategy_name in self.strategy_orderid_map:
vt_orderids = self.strategy_orderid_map.pop(strategy_name)
# Remove vt_orderid strategy map
for vt_orderid in vt_orderids:
if vt_orderid in self.orderid_strategy_map:
self.orderid_strategy_map.pop(vt_orderid)
# Remove from strategies
self.strategies.pop(strategy_name)
return True
def load_strategy_class(self):
"""
Load strategy class from source code.
"""
path1 = Path(__file__).parent.joinpath("strategies")
self.load_strategy_class_from_folder(
path1, "vnpy.app.cta_strategy.strategies")
path2 = Path.cwd().joinpath("strategies")
self.load_strategy_class_from_folder(path2, "strategies")
def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
"""
Load strategy class from certain folder.
"""
for dirpath, dirnames, filenames in os.walk(str(path)):
for filename in filenames:
if filename.split(".")[-1] in ("py", "pyd", "so"):
strategy_module_name = ".".join([module_name, filename.split(".")[0]])
self.load_strategy_class_from_module(strategy_module_name)
def load_strategy_class_from_module(self, module_name: str):
"""
Load strategy class from module file.
"""
try:
module = importlib.import_module(module_name)
# print(f"{module_name}'s module:{module}") # hxxjava add
for name in dir(module):
# print(f"name:{name}") # hxxjava add
value = getattr(module, name)
if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
self.classes[value.__name__] = value
# print(f"value.__name__:{value.__name__}") # hxxjava add
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
def load_strategy_data(self):
"""
Load strategy data from json file.
"""
self.strategy_data = load_json(self.data_filename)
def sync_strategy_data(self, strategy: CtaTemplate):
"""
Sync strategy data into json file.
"""
data = strategy.get_variables()
data.pop("inited") # Strategy status (inited, trading) should not be synced.
data.pop("trading")
self.strategy_data[strategy.strategy_name] = data
save_json(self.data_filename, self.strategy_data)
def get_all_strategy_class_names(self):
"""
Return names of strategy classes loaded.
"""
return list(self.classes.keys())
def get_strategy_class_parameters(self, class_name: str):
"""
Get default parameters of a strategy class.
"""
strategy_class = self.classes[class_name]
parameters = {}
for name in strategy_class.parameters:
parameters[name] = getattr(strategy_class, name)
return parameters
def get_strategy_parameters(self, strategy_name):
"""
Get parameters of a strategy.
"""
strategy = self.strategies[strategy_name]
return strategy.get_parameters()
def init_all_strategies(self):
"""
"""
for strategy_name in self.strategies.keys():
self.init_strategy(strategy_name)
def start_all_strategies(self):
"""
"""
for strategy_name in self.strategies.keys():
self.start_strategy(strategy_name)
def stop_all_strategies(self):
"""
"""
for strategy_name in self.strategies.keys():
self.stop_strategy(strategy_name)
def load_strategy_setting(self):
"""
Load setting file.
"""
self.strategy_setting = load_json(self.setting_filename)
for strategy_name, strategy_config in self.strategy_setting.items():
self.add_strategy(
strategy_config["class_name"],
strategy_name,
strategy_config["vt_symbol"],
strategy_config["setting"]
)
def update_strategy_setting(self, strategy_name: str, setting: dict):
"""
Update setting file.
"""
strategy = self.strategies[strategy_name]
self.strategy_setting[strategy_name] = {
"class_name": strategy.__class__.__name__,
"vt_symbol": strategy.vt_symbol,
"setting": setting,
}
save_json(self.setting_filename, self.strategy_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
Update setting file.
"""
if strategy_name not in self.strategy_setting:
return
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
def put_stop_order_event(self, stop_order: StopOrder):
"""
Put an event to update stop order status.
"""
event = Event(EVENT_CTA_STOPORDER, stop_order)
self.event_engine.put(event)
def put_strategy_event(self, strategy: CtaTemplate):
"""
Put an event to update strategy status.
"""
data = strategy.get_data()
event = Event(EVENT_CTA_STRATEGY, data)
self.event_engine.put(event)
#--------------------------------------------------------------------------------------------------
def get_position_detail(self, vt_symbol:str):
"""
查询long_pos,short_pos(持仓),long_pnl,short_pnl(盈亏),active_order(未成交字典)
收到PositionHolding类数据
"""
try:
return self.offset_converter.get_position_holding(vt_symbol)
except:
self.write_log(f"当前获取持仓信息为:{self.offset_converter.get_position_holding(vt_symbol)},等待获取持仓信息")
position_detail = OrderedDict()
position_detail.active_orders = {}
position_detail.long_pos = 0
position_detail.long_pnl = 0
position_detail.long_yd = 0
position_detail.long_td = 0
position_detail.long_pos_frozen = 0
position_detail.long_price = 0
position_detail.short_pos = 0
position_detail.short_pnl = 0
position_detail.short_yd = 0
position_detail.short_td = 0
position_detail.short_price = 0
position_detail.short_pos_frozen = 0
return position_detail
def write_log(self, msg: str, strategy: CtaTemplate = None):
"""
Create cta engine log event.
"""
if strategy:
msg = f"{strategy.strategy_name}: {msg}"
log = LogData(msg=msg, gateway_name="CtaStrategy")
event = Event(type=EVENT_CTA_LOG, data=log)
self.event_engine.put(event)
def send_email(self, msg: str, strategy: CtaTemplate = None):
"""
Send email to default receiver.
"""
if strategy:
subject = f"{strategy.strategy_name}"
else:
subject = "CTA策略引擎"
self.main_engine.send_email(subject, msg)
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtGui, QtWidgets
from vnpy.trader.ui.widget import (
BaseCell,
EnumCell,
MsgCell,
TimeCell,
BaseMonitor
)
from ..base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY,
)
from ..engine import CtaEngine
from vnpy.usertools.kx_chart import NewChartWidget # hxxjava add
class CtaManager(QtWidgets.QWidget):
""""""
signal_log = QtCore.pyqtSignal(Event)
signal_strategy = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super(CtaManager, self).__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.cta_engine = main_engine.get_engine(APP_NAME)
self.managers = {}
self.init_ui()
self.register_event()
self.cta_engine.init_engine()
self.update_class_combo()
def init_ui(self):
""""""
self.setWindowTitle("CTA策略")
# Create widgets
self.class_combo = QtWidgets.QComboBox()
add_button = QtWidgets.QPushButton("添加策略")
add_button.clicked.connect(self.add_strategy)
init_button = QtWidgets.QPushButton("全部初始化")
init_button.clicked.connect(self.cta_engine.init_all_strategies)
start_button = QtWidgets.QPushButton("全部启动")
start_button.clicked.connect(self.cta_engine.start_all_strategies)
stop_button = QtWidgets.QPushButton("全部停止")
stop_button.clicked.connect(self.cta_engine.stop_all_strategies)
clear_button = QtWidgets.QPushButton("清空日志")
clear_button.clicked.connect(self.clear_log)
self.scroll_layout = QtWidgets.QVBoxLayout()
self.scroll_layout.addStretch()
scroll_widget = QtWidgets.QWidget()
scroll_widget.setLayout(self.scroll_layout)
scroll_area = QtWidgets.QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setWidget(scroll_widget)
self.log_monitor = LogMonitor(self.main_engine, self.event_engine)
self.stop_order_monitor = StopOrderMonitor(
self.main_engine, self.event_engine
)
# Set layout
hbox1 = QtWidgets.QHBoxLayout()
hbox1.addWidget(self.class_combo)
hbox1.addWidget(add_button)
hbox1.addStretch()
hbox1.addWidget(init_button)
hbox1.addWidget(start_button)
hbox1.addWidget(stop_button)
hbox1.addWidget(clear_button)
grid = QtWidgets.QGridLayout()
grid.addWidget(scroll_area, 0, 0, 2, 1)
grid.addWidget(self.stop_order_monitor, 0, 1)
grid.addWidget(self.log_monitor, 1, 1)
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox1)
vbox.addLayout(grid)
self.setLayout(vbox)
def update_class_combo(self):
""""""
self.class_combo.addItems(
self.cta_engine.get_all_strategy_class_names()
)
def register_event(self):
""""""
self.signal_strategy.connect(self.process_strategy_event)
self.event_engine.register(
EVENT_CTA_STRATEGY, self.signal_strategy.emit
)
def process_strategy_event(self, event):
"""
Update strategy status onto its monitor.
"""
data = event.data
strategy_name = data["strategy_name"]
if strategy_name in self.managers:
manager = self.managers[strategy_name]
manager.update_data(data)
else:
manager = StrategyManager(self, self.cta_engine, data)
self.scroll_layout.insertWidget(0, manager)
self.managers[strategy_name] = manager
def remove_strategy(self, strategy_name):
""""""
manager = self.managers.pop(strategy_name)
manager.deleteLater()
def add_strategy(self):
""""""
class_name = str(self.class_combo.currentText())
if not class_name:
return
parameters = self.cta_engine.get_strategy_class_parameters(class_name)
editor = SettingEditor(parameters, class_name=class_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
vt_symbol = setting.pop("vt_symbol")
strategy_name = setting.pop("strategy_name")
self.cta_engine.add_strategy(
class_name, strategy_name, vt_symbol, setting
)
def clear_log(self):
""""""
self.log_monitor.setRowCount(0)
def show(self):
""""""
self.showMaximized()
class StrategyManager(QtWidgets.QFrame):
"""
Manager for a strategy
"""
def __init__(
self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict
):
""""""
super(StrategyManager, self).__init__()
self.cta_manager = cta_manager
self.cta_engine = cta_engine
self.strategy_name = data["strategy_name"]
self._data = data
self.init_ui()
def init_ui(self):
""""""
self.setFixedHeight(300)
self.setFrameShape(self.Box)
self.setLineWidth(1)
self.init_button = QtWidgets.QPushButton("初始化")
self.init_button.clicked.connect(self.init_strategy)
self.start_button = QtWidgets.QPushButton("启动")
self.start_button.clicked.connect(self.start_strategy)
self.start_button.setEnabled(False)
self.stop_button = QtWidgets.QPushButton("停止")
self.stop_button.clicked.connect(self.stop_strategy)
self.stop_button.setEnabled(False)
self.edit_button = QtWidgets.QPushButton("编辑")
self.edit_button.clicked.connect(self.edit_strategy)
self.remove_button = QtWidgets.QPushButton("移除")
self.remove_button.clicked.connect(self.remove_strategy)
strategy_name = self._data["strategy_name"]
vt_symbol = self._data["vt_symbol"]
class_name = self._data["class_name"]
author = self._data["author"]
label_text = (
f"{strategy_name} - {vt_symbol} ({class_name} by {author})"
)
label = QtWidgets.QLabel(label_text)
label.setAlignment(QtCore.Qt.AlignCenter)
self.parameters_monitor = DataMonitor(self._data["parameters"])
self.variables_monitor = DataMonitor(self._data["variables"])
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(self.init_button)
hbox.addWidget(self.start_button)
hbox.addWidget(self.stop_button)
hbox.addWidget(self.edit_button)
hbox.addWidget(self.remove_button)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(label)
vbox.addLayout(hbox)
vbox.addWidget(self.parameters_monitor)
vbox.addWidget(self.variables_monitor)
self.setLayout(vbox)
def update_data(self, data: dict):
""""""
self._data = data
self.parameters_monitor.update_data(data["parameters"])
self.variables_monitor.update_data(data["variables"])
# Update button status
variables = data["variables"]
inited = variables["inited"]
trading = variables["trading"]
if not inited:
return
self.init_button.setEnabled(False)
if trading:
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.edit_button.setEnabled(False)
self.remove_button.setEnabled(False)
else:
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.edit_button.setEnabled(True)
self.remove_button.setEnabled(True)
def init_strategy(self):
""""""
self.open_kx_chart() # hxxjava add
self.cta_engine.init_strategy(self.strategy_name)
def start_strategy(self):
""""""
self.cta_engine.start_strategy(self.strategy_name)
def stop_strategy(self):
""""""
self.cta_engine.stop_strategy(self.strategy_name)
def edit_strategy(self):
""""""
strategy_name = self._data["strategy_name"]
parameters = self.cta_engine.get_strategy_parameters(strategy_name)
editor = SettingEditor(parameters, strategy_name=strategy_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
self.cta_engine.edit_strategy(strategy_name, setting)
def remove_strategy(self):
""""""
result = self.cta_engine.remove_strategy(self.strategy_name)
# Only remove strategy gui manager if it has been removed from engine
if result:
self.cta_manager.remove_strategy(self.strategy_name)
if self.kx_chart: # hxxjava add
self.kx_chart.close()
self.kx_chart = None
def open_kx_chart(self): # hxxjava add
strategy = self.cta_engine.strategies[self.strategy_name]
setting = self.cta_engine.strategy_setting[self.strategy_name]['setting']
show_chart = setting.get("show_chart",None)
self.kx_chart = None
if show_chart:
event_engine = self.cta_engine.event_engine
kx_interval = setting.get("kx_interval",None)
self.kx_chart = NewChartWidget(event_engine = event_engine,strategy_name = self.strategy_name)
self.kx_chart.setWindowTitle(f"K线图表:{self.strategy_name},周期:{kx_interval}")
strategy.init_kx_chart(self.kx_chart)
self.kx_chart.register_event() # 注册消息
self.kx_chart.show() # 显示K线图
class DataMonitor(QtWidgets.QTableWidget):
"""
Table monitor for parameters and variables.
"""
def __init__(self, data: dict):
""""""
super(DataMonitor, self).__init__()
self._data = data
self.cells = {}
self.init_ui()
def init_ui(self):
""""""
labels = list(self._data.keys())
self.setColumnCount(len(labels))
self.setHorizontalHeaderLabels(labels)
self.setRowCount(1)
self.verticalHeader().setSectionResizeMode(
QtWidgets.QHeaderView.Stretch
)
self.verticalHeader().setVisible(False)
self.setEditTriggers(self.NoEditTriggers)
for column, name in enumerate(self._data.keys()):
value = self._data[name]
cell = QtWidgets.QTableWidgetItem(str(value))
cell.setTextAlignment(QtCore.Qt.AlignCenter)
self.setItem(0, column, cell)
self.cells[name] = cell
def update_data(self, data: dict):
""""""
for name, value in data.items():
cell = self.cells[name]
cell.setText(str(value))
class StopOrderMonitor(BaseMonitor):
"""
Monitor for local stop order.
"""
event_type = EVENT_CTA_STOPORDER
data_key = "stop_orderid"
sorting = True
headers = {
"stop_orderid": {"display": "停止委托号","cell": BaseCell,"update": False,},
"vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True},
"vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": EnumCell, "update": False},
"offset": {"display": "开平", "cell": EnumCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"status": {"display": "状态", "cell": EnumCell, "update": True},
"lock": {"display": "锁仓", "cell": BaseCell, "update": False},
"strategy_name": {"display": "策略名", "cell": BaseCell, "update": False},
}
class LogMonitor(BaseMonitor):
"""
Monitor for log data.
"""
event_type = EVENT_CTA_LOG
data_key = ""
sorting = False
headers = {
"time": {"display": "时间", "cell": TimeCell, "update": False},
"msg": {"display": "信息", "cell": MsgCell, "update": False},
}
def init_ui(self):
"""
Stretch last column.
"""
super(LogMonitor, self).init_ui()
self.horizontalHeader().setSectionResizeMode(
1, QtWidgets.QHeaderView.Stretch
)
def insert_new_row(self, data):
"""
Insert a new row at the top of table.
"""
super(LogMonitor, self).insert_new_row(data)
self.resizeRowToContents(0)
class SettingEditor(QtWidgets.QDialog):
"""
For creating new strategy and editing strategy parameters.
"""
def __init__(
self, parameters: dict, strategy_name: str = "", class_name: str = ""
):
""""""
super(SettingEditor, self).__init__()
self.parameters = parameters
self.strategy_name = strategy_name
self.class_name = class_name
self.edits = {}
self.init_ui()
def init_ui(self):
""""""
form = QtWidgets.QFormLayout()
# Add vt_symbol and name edit if add new strategy
if self.class_name:
self.setWindowTitle(f"添加策略:{self.class_name}")
button_text = "添加"
parameters = {"strategy_name": "", "vt_symbol": ""}
parameters.update(self.parameters)
else:
self.setWindowTitle(f"参数编辑:{self.strategy_name}")
button_text = "确定"
parameters = self.parameters
for name, value in parameters.items():
type_ = type(value)
edit = QtWidgets.QLineEdit(str(value))
if type_ is int:
validator = QtGui.QIntValidator()
edit.setValidator(validator)
elif type_ is float:
validator = QtGui.QDoubleValidator()
edit.setValidator(validator)
form.addRow(f"{name} {type_}", edit)
self.edits[name] = (edit, type_)
button = QtWidgets.QPushButton(button_text)
button.clicked.connect(self.accept)
form.addRow(button)
widget = QtWidgets.QWidget()
widget.setLayout(form)
scroll = QtWidgets.QScrollArea()
scroll.setWidgetResizable(True)
scroll.setWidget(widget)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(scroll)
self.setLayout(vbox)
def get_setting(self):
""""""
setting = {}
if self.class_name:
setting["class_name"] = self.class_name
for name, tp in self.edits.items():
edit, type_ = tp
value_text = edit.text()
if type_ == bool:
if value_text == "True":
value = True
else:
value = False
else:
value = type_(value_text)
setting[name] = value
return setting
import os
import importlib
import traceback
from datetime import datetime
from threading import Thread
from pathlib import Path
from inspect import getfile
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Interval
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.object import HistoryRequest
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.database import database_manager
from vnpy.app.cta_strategy import CtaTemplate
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
APP_NAME = "CtaBacktester"
EVENT_BACKTESTER_LOG = "eBacktesterLog"
EVENT_BACKTESTER_BACKTESTING_FINISHED = "eBacktesterBacktestingFinished"
EVENT_BACKTESTER_OPTIMIZATION_FINISHED = "eBacktesterOptimizationFinished"
from vnpy.app.cta_strategy.base import EngineType # hxxjava add
class BacktesterEngine(BaseEngine):
"""
For running CTA strategy backtesting.
"""
engine_type = EngineType.BACKTESTING # hxxjava add --- 供策略回测时使用
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.classes = {}
self.backtesting_engine = None
self.thread = None
# Backtesting reuslt
self.result_df = None
self.result_statistics = None
# Optimization result
self.result_values = None
def init_engine(self):
""""""
self.write_log("初始化CTA回测引擎")
self.backtesting_engine = BacktestingEngine()
# Redirect log from backtesting engine outside.
self.backtesting_engine.output = self.write_log
self.load_strategy_class()
self.write_log("策略文件加载完成")
self.init_rqdata()
def init_rqdata(self):
"""
Init RQData client.
"""
result = rqdata_client.init()
if result:
self.write_log("RQData数据接口初始化成功")
def write_log(self, msg: str):
""""""
event = Event(EVENT_BACKTESTER_LOG)
event.data = msg
self.event_engine.put(event)
def load_strategy_class(self):
"""
Load strategy class from source code.
"""
app_path = Path(__file__).parent.parent
path1 = app_path.joinpath("cta_strategy", "strategies")
self.load_strategy_class_from_folder(
path1, "vnpy.app.cta_strategy.strategies")
path2 = Path.cwd().joinpath("strategies")
self.load_strategy_class_from_folder(path2, "strategies")
def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
"""
Load strategy class from certain folder.
"""
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
# Load python source code file
if filename.endswith(".py"):
strategy_module_name = ".".join(
[module_name, filename.replace(".py", "")])
self.load_strategy_class_from_module(strategy_module_name)
# Load compiled pyd binary file
elif filename.endswith(".pyd"):
strategy_module_name = ".".join(
[module_name, filename.split(".")[0]])
self.load_strategy_class_from_module(strategy_module_name)
def load_strategy_class_from_module(self, module_name: str):
"""
Load strategy class from module file.
"""
try:
module = importlib.import_module(module_name)
importlib.reload(module)
for name in dir(module):
value = getattr(module, name)
if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
self.classes[value.__name__] = value
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
def reload_strategy_class(self):
""""""
self.classes.clear()
self.load_strategy_class()
self.write_log("策略文件重载刷新完成")
def get_strategy_class_names(self):
""""""
return list(self.classes.keys())
def run_backtesting(
self,
class_name: str,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime,
rate: float,
slippage: float,
size: int,
pricetick: float,
capital: int,
inverse: bool,
setting: dict
):
""""""
self.result_df = None
self.result_statistics = None
engine = self.backtesting_engine
engine.clear_data()
engine.set_parameters(
vt_symbol=vt_symbol,
interval=interval,
start=start,
end=end,
rate=rate,
slippage=slippage,
size=size,
pricetick=pricetick,
capital=capital,
inverse=inverse
)
strategy_class = self.classes[class_name]
engine.add_strategy(
strategy_class,
setting
)
engine.load_data()
engine.run_backtesting()
self.result_df = engine.calculate_result()
self.result_statistics = engine.calculate_statistics(output=False)
# Clear thread object handler.
self.thread = None
# Put backtesting done event
event = Event(EVENT_BACKTESTER_BACKTESTING_FINISHED)
self.event_engine.put(event)
def start_backtesting(
self,
class_name: str,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime,
rate: float,
slippage: float,
size: int,
pricetick: float,
capital: int,
inverse: bool,
setting: dict
):
if self.thread:
self.write_log("已有任务在运行中,请等待完成")
return False
self.write_log("-" * 40)
self.thread = Thread(
target=self.run_backtesting,
args=(
class_name,
vt_symbol,
interval,
start,
end,
rate,
slippage,
size,
pricetick,
capital,
inverse,
setting
)
)
self.thread.start()
return True
def get_result_df(self):
""""""
return self.result_df
def get_result_statistics(self):
""""""
return self.result_statistics
def get_result_values(self):
""""""
return self.result_values
def get_default_setting(self, class_name: str):
""""""
strategy_class = self.classes[class_name]
return strategy_class.get_class_parameters()
def run_optimization(
self,
class_name: str,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime,
rate: float,
slippage: float,
size: int,
pricetick: float,
capital: int,
inverse: bool,
optimization_setting: OptimizationSetting,
use_ga: bool
):
""""""
if use_ga:
self.write_log("开始遗传算法参数优化")
else:
self.write_log("开始多进程参数优化")
self.result_values = None
engine = self.backtesting_engine
engine.clear_data()
engine.set_parameters(
vt_symbol=vt_symbol,
interval=interval,
start=start,
end=end,
rate=rate,
slippage=slippage,
size=size,
pricetick=pricetick,
capital=capital,
inverse=inverse
)
strategy_class = self.classes[class_name]
engine.add_strategy(
strategy_class,
{}
)
if use_ga:
self.result_values = engine.run_ga_optimization(
optimization_setting,
output=False
)
else:
self.result_values = engine.run_optimization(
optimization_setting,
output=False
)
# Clear thread object handler.
self.thread = None
self.write_log("多进程参数优化完成")
# Put optimization done event
event = Event(EVENT_BACKTESTER_OPTIMIZATION_FINISHED)
self.event_engine.put(event)
def start_optimization(
self,
class_name: str,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime,
rate: float,
slippage: float,
size: int,
pricetick: float,
capital: int,
inverse: bool,
optimization_setting: OptimizationSetting,
use_ga: bool
):
if self.thread:
self.write_log("已有任务在运行中,请等待完成")
return False
self.write_log("-" * 40)
self.thread = Thread(
target=self.run_optimization,
args=(
class_name,
vt_symbol,
interval,
start,
end,
rate,
slippage,
size,
pricetick,
capital,
inverse,
optimization_setting,
use_ga
)
)
self.thread.start()
return True
def run_downloading(
self,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime
):
"""
Query bar data from RQData.
"""
self.write_log(f"{vt_symbol}-{interval}开始下载历史数据")
try:
symbol, exchange = extract_vt_symbol(vt_symbol)
except ValueError:
self.write_log(f"{vt_symbol}解析失败,请检查交易所后缀")
self.thread = None
return
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=Interval(interval),
start=start,
end=end
)
contract = self.main_engine.get_contract(vt_symbol)
try:
# If history data provided in gateway, then query
if contract and contract.history_data:
data = self.main_engine.query_history(
req, contract.gateway_name
)
# Otherwise use RQData to query data
else:
data = rqdata_client.query_history(req)
if data:
database_manager.save_bar_data(data)
self.write_log(f"{vt_symbol}-{interval}历史数据下载完成")
else:
self.write_log(f"数据下载失败,无法获取{vt_symbol}的历史数据")
except Exception:
msg = f"数据下载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
# Clear thread object handler.
self.thread = None
def start_downloading(
self,
vt_symbol: str,
interval: str,
start: datetime,
end: datetime
):
if self.thread:
self.write_log("已有任务在运行中,请等待完成")
return False
self.write_log("-" * 40)
self.thread = Thread(
target=self.run_downloading,
args=(
vt_symbol,
interval,
start,
end
)
)
self.thread.start()
return True
def get_all_trades(self):
""""""
return self.backtesting_engine.get_all_trades()
def get_all_orders(self):
""""""
return self.backtesting_engine.get_all_orders()
def get_all_daily_results(self):
""""""
return self.backtesting_engine.get_all_daily_results()
def get_history_data(self):
""""""
return self.backtesting_engine.history_data
def get_strategy_class_file(self, class_name: str):
""""""
strategy_class = self.classes[class_name]
file_path = getfile(strategy_class)
return file_path
from typing import Any,List,Dict,Tuple
import copy
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager,
StopOrder,
Direction
)
from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event
from vnpy.trader.object import (
LogData,
TickData,
BarData,
TradeData,
OrderData,
)
from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval
from vnpy.app.cta_strategy.base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_TICK,
EVENT_CTA_HISTORY_BAR,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import ( # hxxjava add
NewChartWidget,
CandleItem,
VolumeItem,
LineItem,
SmaItem,
RsiItem,
MacdItem,
)
from vnpy.usertools.kx_chart import NewChartWidget # hxxjava add
class _kx_strategy(CtaTemplate):
""""""
author = "hxxjava"
kx_interval = 1
show_chart = False # 显示K线图表
parameters = [
"kx_interval",
"show_chart"
]
kx_count:int = 0
cta_manager = None
variables = ["kx_count"]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = BarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar)
self.am = ArrayManager()
cta_engine:CtaEngine = self.cta_engine
self.engine_type = cta_engine.engine_type
self.even_engine = cta_engine.main_engine.event_engine
# 必须在这里声明,因为它们是实例变量
self.all_bars:List[BarData] = []
self.current_tick:[TickData] = None
self.current_bar:[BarData] = None
self.last_tick:[TickData] = None
def on_init(self):
"""
Callback when strategy is inited.
"""
self.load_bar(20)
if len(self.all_bars)>0:
self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)
def on_start(self):
""" """
self.write_log("已开始")
def on_stop(self):
""""""
self.write_log("_kx_strategy 已停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.current_tick = tick # 记录最新tick
if self.inited:
# 先产生当前临时K线
self.cur_bar = self.get_cur_bar(tick)
if self.cur_bar:
# 发送当前临时K线更新消息
self.send_event(EVENT_CTA_BAR,self.cur_bar)
# 再更新tick,产生1分钟K线乃至N 分钟线
self.bg.update_tick(tick)
self.send_event(EVENT_CTA_TICK,tick)
self.last_tick = tick
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
if self.inited:
self.write_log(f"I got a 1min BarData")
self.bg.update_bar(bar)
def on_Nmin_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.all_bars.append(bar)
self.kx_count = len(self.all_bars)
if self.inited:
self.write_log(f"I got a {self.kx_interval}min BarData")
self.send_event(EVENT_CTA_BAR,bar)
if self.current_tick:
# 当新N分钟K线产生的时候,立即产生新的临时K线
self.current_bar = None
self.get_cur_bar(self.current_tick)
self.put_event()
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.send_event(EVENT_CTA_TRADE,trade)
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
self.send_event(EVENT_CTA_ORDER,order)
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
self.send_event(EVENT_CTA_STOPORDER,stop_order)
def get_cur_bar(self,tick:TickData)->BarData:
"""
产生临时K线,每个tick都会更新。除非把self.window_bar赋值为None,
不会产生新的K线,只会更新K线的量和加。
注意:self.last_tick是在BarGenerator中声明和改变的
"""
if not self.inited or not self.last_tick:
return None
if self.last_tick and tick.datetime < self.last_tick.datetime:
return None
if not self.current_bar:
# Generate timestamp for bar data
if self.bg.interval == Interval.MINUTE:
dt = tick.datetime.replace(second=0, microsecond=0)
else:
dt = tick.datetime.replace(minute=0, second=0, microsecond=0)
self.current_bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
datetime=dt,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
)
# Otherwise, update high/low price into window bar
else:
self.current_bar.high_price = max(self.current_bar.high_price, tick.last_price)
self.current_bar.low_price = min(self.current_bar.low_price, tick.last_price)
# Update last price/volume into window bar
self.current_bar.close_price = tick.last_price
volume_change = tick.volume - self.last_tick.volume
self.current_bar.volume += volume_change
self.current_bar.open_interest = tick.open_interest
return copy.deepcopy(self.current_bar)
def send_event(self,event_type:str,data:Any):
if self.engine_type==EngineType.LIVE and self.show_chart: # "如果显示K线图表"
self.even_engine.put(Event(event_type,(self.strategy_name,data)))
def init_kx_chart(self,kx_chart:NewChartWidget=None): # hxxjava add ----- 提供给外部调用
# self.write_log("init_kx_chart executed !!!")
if kx_chart:
kx_chart.add_plot("candle", hide_x_axis=True)
kx_chart.add_plot("volume", maximum_height=150)
kx_chart.add_plot("rsi", maximum_height=150)
kx_chart.add_plot("macd", maximum_height=150)
kx_chart.add_item(CandleItem, "candle", "candle")
kx_chart.add_item(VolumeItem, "volume", "volume")
kx_chart.add_item(LineItem, "line", "candle")
kx_chart.add_item(SmaItem, "sma", "candle")
kx_chart.add_item(RsiItem, "rsi", "rsi")
kx_chart.add_item(MacdItem, "macd", "macd")
kx_chart.add_last_price_line()
kx_chart.add_cursor()
启动VnTrader,进入策略管理界面,完成如下步骤:
1)从策略下拉框中选择_kx_strategy策略
2)点击添加策略按钮进入3界面
3)输入策略名称、vt_symbol、kx_interval和show_chart的值,注意kx_interval这里是你想要的K线周期,单位是分钟。show_chart参数为True标识需要显示K线图表,其他值则不显示。
4)初始化策略,如果参数为True的话,完成后显示K线图表窗口,并且显示20日里的历史K线图
5)按启动按钮启动策略,如果是交易时段,则K线图表就会显示最新收到的K线。提示还会实时显示未完成的临时K线
用户实现了自己的CTA策略,可能会放在多个合约上跑。用户策略里会声母一系列的策略成员变量,这些策略成员变量应该是每个策略实例是不同的。可是我发现事实不是这样的!——不同的合约竟然共用着一个了的策略成员变量!
下面代码保存在用户策略文件夹下的test_strategy.py文件中
from typing import Any,List,Dict,Tuple
import copy
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager,
StopOrder,
Direction
)
from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event
from vnpy.trader.object import (
LogData,
TickData,
BarData,
TradeData,
OrderData,
)
class test_strategy(CtaTemplate):
""""""
author = "hxxjava"
kx_interval = 1
parameters = [
"kx_interval"
]
kx_count:int = 0
all_bars:List[BarData] =[]
variables = ["kx_count"]
relate_names:List[str] = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = BarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar)
self.am = ArrayManager()
self.relate_names.append(vt_symbol)
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("test_strategy 初始化")
self.load_bar(20)
self.write_log(f"relate_names={self.relate_names} !!!")
def on_start(self):
""" """
self.write_log(f"test_strategy 已开始 self.kx_interval={self.kx_interval}",)
def on_stop(self):
""""""
self.write_log("test_strategy 已停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
if self.inited:
self.write_log(f"I got a 1min BarData")
self.bg.update_bar(bar)
def on_Nmin_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.all_bars.append(bar)
self.kx_count = len(self.all_bars)
if self.inited:
self.write_log(f"I got a {self.kx_interval}min BarData {self.kx_count}")
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
用来存放该策略里20日的 all_bars数组中保存有ag2012.SHFE的所有20日里的所有10分钟K线数据
用来存放该策略里20日的 all_bars数组中,不只是保存有rb2010的所有5分钟K线数据,也保存有ag2012的10分钟K线数据。
不同的用户策略应该拥有各自不同的成员变量:
all_bars
relate_names
可是从实际的测试结果看,它们却是相同的,这是不应该的!
原因发生在cta_engine里的这个函数中:
def add_strategy(
self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
):
"""
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(f"创建策略失败,存在重名{strategy_name}")
return
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(f"创建策略失败,找不到策略类{class_name}")
return
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
self.strategies[strategy_name] = strategy
# Add vt_symbol to strategy map.
strategies = self.symbol_strategy_map[vt_symbol]
strategies.append(strategy)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
这里的的代码可以看出,不同的策略实例使用相同的策略类时,不是立即创建新的策略类实例,而是从self.classes字典中,根据class_name查询得到的。
VNPY系统确实是非常好的开发平台!可是做CTA交易的朋友一定有这样的感觉,策略在安静地运行,你却看不见它的K线,也无法直观地看到它的交易情况。经常来回地在第三方的行情客户端来回切换,为的是看看都对策略运动什么样一个情况。如果能够为每个CTA策略都给出一个观看图表的机会,那该有多好!
本人经过一周时间的努力下,已经基本让CTA策略可以看的见了。
1 实现一个给予ChartWizard的K线图NewChartWidget,它可以接受历史K线数据、tick数据、bar线数据、order数据、trader数据的推送
2 定义五个与行情相关的消息类型
为K线图表添砖加瓦——让CTA策略看得见(1)
EVENT_CTA_HISTORY_BAR —— 历史K线消息
EVENT_CTA_TICK —— TICK消息
EVENT_CTA_BAR —— BAR消息
EVENT_CTA_ORDER —— 委托单消息
EVENT_CTA_TRADE —— 成交单
EVENT_CTA_STOPORDER —— 停止单消息(系统本来就有)
3 利用策略的on_start(),on_tick(),on_bar(),on_order(),on_stop_order(),on_trade()接口,发送这些消息
4 可以接受实时的临时K线的显示
5 可以显示行情,也已显示委托、成交的发生位置
经过修改后,在CTA策略管理界面中,如上图所示,每个策略都会增加一个 “K线图表” 按钮,在已经新建好策略后,只有策略初始化后“K线图表” 按钮和“开始”按钮同时有效,如果你想观看该策略的K线图,先点击“K线图表” 按钮,就会显示一个空的K线图窗口,再和原来一样按“开始”按钮,K线图立即就把策略在初始化阶段读取的历史K线显示处理,当然你的你所关心的主图指标和附图指标可以是可以显示的。
目前只实现了几个代表性的几个指标,还不能给自由配置。未来应该是设计成可以在每个策略可以独立配置自己的K线图表的主图和附图的数量可指标,因为不同的策略的算法是不一样,周期也是可能是不一样的。
本地停止单在VNPY系统中的是在本地维护的,符合条件后立即转化为限价委托单而执行的。
CTA策略管理界面CtaManager中,包含有一个停止单监视器StopOrderMonitor,用来显示策略已经发出的停止单的。
headers = {
"stop_orderid": {"display": "停止委托号","cell": BaseCell,"update": False,},
"vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True},
"vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": EnumCell, "update": False},
"offset": {"display": "开平", "cell": EnumCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"status": {"display": "状态", "cell": EnumCell, "update": True},
"lock": {"display": "锁仓", "cell": BaseCell, "update": False},
"strategy_name": {"display": "策略名", "cell": BaseCell, "update": False},
}
在实际使用的时候,大家是否有这种感觉,这么多停止单,你可以知道停止单的编号,却不知道它是:
@dataclass
class StopOrder:
vt_symbol: str
direction: Direction
offset: Offset
price: float
volume: float
stop_orderid: str
strategy_name: str
lock: bool = False
vt_orderids: list = field(default_factory=list)
status: StopOrderStatus = StopOrderStatus.WAITING
CtpGateway继承了BaseGateway,同时包含两个重要的接口:
self.td_api = CtpTdApi(self) # 交易接口
self.md_api = CtpMdApi(self) # 行情接口
另外还有1个向行情服务器订阅行情的订阅函数:
def subscribe(self, req: SubscribeRequest):
""""""
self.md_api.subscribe(req)
4个向交易服务器发送委托请求、取消委托请求、查询账户请求和一个查询持仓请求函数。
def send_order(self, req: OrderRequest):
"""发送委托请求"""
if req.type == OrderType.RFQ:
vt_orderid = self.td_api.send_rfq(req)
else:
vt_orderid = self.td_api.send_order(req)
return vt_orderid
def cancel_order(self, req: CancelRequest):
"""取消委托请求"""
self.td_api.cancel_order(req)
def query_account(self):
"""查询账户请求"""
self.td_api.query_account()
def query_position(self):
"""查询持仓请求"""
self.td_api.query_position()
CtpGateway有账户查询函数,用户持仓函数,可是没有历史委托和历史成交查询函数。
可它仅仅提供了可怜的5个主动执行交易的函数:
send_order()——发委托申请
cancel_order()——取消委托单
send_rfq()
query_account()——查询账户
query_position()——查询持仓
# vnpy.api.generator.ctp_td_source.function.cpp 提供了丰富的接口函数:
int TdApi::reqAuthenticate(const dict &req, int reqid)
int TdApi::reqUserLogin(const dict &req, int reqid)
int TdApi::reqUserLogout(const dict &req, int reqid)
int TdApi::reqUserPasswordUpdate(const dict &req, int reqid)
int TdApi::reqTradingAccountPasswordUpdate(const dict &req, int reqid)
int TdApi::reqUserAuthMethod(const dict &req, int reqid)
int TdApi::reqGenUserCaptcha(const dict &req, int reqid)
int TdApi::reqGenUserText(const dict &req, int reqid)
int TdApi::reqUserLoginWithCaptcha(const dict &req, int reqid)
int TdApi::reqUserLoginWithText(const dict &req, int reqid)
int TdApi::reqUserLoginWithOTP(const dict &req, int reqid)
int TdApi::reqOrderInsert(const dict &req, int reqid)
int TdApi::reqParkedOrderInsert(const dict &req, int reqid)
int TdApi::reqParkedOrderAction(const dict &req, int reqid)
int TdApi::reqOrderAction(const dict &req, int reqid)
int TdApi::reqQueryMaxOrderVolume(const dict &req, int reqid)
int TdApi::reqSettlementInfoConfirm(const dict &req, int reqid)
int TdApi::reqRemoveParkedOrder(const dict &req, int reqid)
int TdApi::reqRemoveParkedOrderAction(const dict &req, int reqid)
int TdApi::reqExecOrderInsert(const dict &req, int reqid)
int TdApi::reqExecOrderAction(const dict &req, int reqid)
int TdApi::reqForQuoteInsert(const dict &req, int reqid)
int TdApi::reqQuoteInsert(const dict &req, int reqid)
int TdApi::reqQuoteAction(const dict &req, int reqid)
int TdApi::reqBatchOrderAction(const dict &req, int reqid)
int TdApi::reqOptionSelfCloseInsert(const dict &req, int reqid)
int TdApi::reqOptionSelfCloseAction(const dict &req, int reqid)
int TdApi::reqCombActionInsert(const dict &req, int reqid)
int TdApi::reqQryOrder(const dict &req, int reqid)
int TdApi::reqQryTrade(const dict &req, int reqid)
int TdApi::reqQryInvestorPosition(const dict &req, int reqid)
int TdApi::reqQryTradingAccount(const dict &req, int reqid)
int TdApi::reqQryInvestor(const dict &req, int reqid)
int TdApi::reqQryTradingCode(const dict &req, int reqid)
int TdApi::reqQryInstrumentMarginRate(const dict &req, int reqid)
int TdApi::reqQryInstrumentCommissionRate(const dict &req, int reqid)
int TdApi::reqQryExchange(const dict &req, int reqid)
int TdApi::reqQryProduct(const dict &req, int reqid)
int TdApi::reqQryInstrument(const dict &req, int reqid)
int TdApi::reqQryDepthMarketData(const dict &req, int reqid)
int TdApi::reqQrySettlementInfo(const dict &req, int reqid)
int TdApi::reqQryTransferBank(const dict &req, int reqid)
int TdApi::reqQryInvestorPositionDetail(const dict &req, int reqid)
int TdApi::reqQryNotice(const dict &req, int reqid)
int TdApi::reqQrySettlementInfoConfirm(const dict &req, int reqid)
int TdApi::reqQryInvestorPositionCombineDetail(const dict &req, int reqid)
int TdApi::reqQryCFMMCTradingAccountKey(const dict &req, int reqid)
int TdApi::reqQryEWarrantOffset(const dict &req, int reqid)
int TdApi::reqQryInvestorProductGroupMargin(const dict &req, int reqid)
int TdApi::reqQryExchangeMarginRate(const dict &req, int reqid)
int TdApi::reqQryExchangeMarginRateAdjust(const dict &req, int reqid)
int TdApi::reqQryExchangeRate(const dict &req, int reqid)
int TdApi::reqQrySecAgentACIDMap(const dict &req, int reqid)
int TdApi::reqQryProductExchRate(const dict &req, int reqid)
int TdApi::reqQryProductGroup(const dict &req, int reqid)
int TdApi::reqQryMMInstrumentCommissionRate(const dict &req, int reqid)
int TdApi::reqQryMMOptionInstrCommRate(const dict &req, int reqid)
int TdApi::reqQryInstrumentOrderCommRate(const dict &req, int reqid)
int TdApi::reqQrySecAgentTradingAccount(const dict &req, int reqid)
int TdApi::reqQrySecAgentCheckMode(const dict &req, int reqid)
int TdApi::reqQrySecAgentTradeInfo(const dict &req, int reqid)
int TdApi::reqQryOptionInstrTradeCost(const dict &req, int reqid)
int TdApi::reqQryOptionInstrCommRate(const dict &req, int reqid)
int TdApi::reqQryExecOrder(const dict &req, int reqid)
int TdApi::reqQryForQuote(const dict &req, int reqid)
int TdApi::reqQryQuote(const dict &req, int reqid)
int TdApi::reqQryOptionSelfClose(const dict &req, int reqid)
int TdApi::reqQryInvestUnit(const dict &req, int reqid)
int TdApi::reqQryCombInstrumentGuard(const dict &req, int reqid)
int TdApi::reqQryCombAction(const dict &req, int reqid)
int TdApi::reqQryTransferSerial(const dict &req, int reqid)
int TdApi::reqQryAccountregister(const dict &req, int reqid)
int TdApi::reqQryContractBank(const dict &req, int reqid)
int TdApi::reqQryParkedOrder(const dict &req, int reqid)
int TdApi::reqQryParkedOrderAction(const dict &req, int reqid)
int TdApi::reqQryTradingNotice(const dict &req, int reqid)
int TdApi::reqQryBrokerTradingParams(const dict &req, int reqid)
int TdApi::reqQryBrokerTradingAlgos(const dict &req, int reqid)
int TdApi::reqQueryCFMMCTradingAccountToken(const dict &req, int reqid)
int TdApi::reqFromBankToFutureByFuture(const dict &req, int reqid)
int TdApi::reqFromFutureToBankByFuture(const dict &req, int reqid)
int TdApi::reqQueryBankAccountMoneyByFuture(const dict &req, int reqid)
这些函数中下面两个函数是否可以包含到CtpTdApi接口中,然后封装到CtpGateway中供用户策略中使用。
int TdApi::reqQryOrder(const dict &req, int reqid)
{
CThostFtdcQryOrderField myreq = CThostFtdcQryOrderField();
memset(&myreq, 0, sizeof(myreq));
getString(req, "BrokerID", myreq.BrokerID);
getString(req, "InvestorID", myreq.InvestorID);
getString(req, "InstrumentID", myreq.InstrumentID);
getString(req, "ExchangeID", myreq.ExchangeID);
getString(req, "OrderSysID", myreq.OrderSysID);
getString(req, "InsertTimeStart", myreq.InsertTimeStart); // 开始时间
getString(req, "InsertTimeEnd", myreq.InsertTimeEnd); // 结束时间
getString(req, "InvestUnitID", myreq.InvestUnitID);
int i = this->api->ReqQryOrder(&myreq, reqid);
return i;
};
int TdApi::reqQryTrade(const dict &req, int reqid)
{
CThostFtdcQryTradeField myreq = CThostFtdcQryTradeField();
memset(&myreq, 0, sizeof(myreq));
getString(req, "BrokerID", myreq.BrokerID);
getString(req, "InvestorID", myreq.InvestorID);
getString(req, "InstrumentID", myreq.InstrumentID);
getString(req, "ExchangeID", myreq.ExchangeID);
getString(req, "TradeID", myreq.TradeID);
getString(req, "TradeTimeStart", myreq.TradeTimeStart); // 开始时间
getString(req, "TradeTimeEnd", myreq.TradeTimeEnd); // 结束时间
getString(req, "InvestUnitID", myreq.InvestUnitID);
int i = this->api->ReqQryTrade(&myreq, reqid);
return i;
};
交易中,时常会关心上次盈亏和当前算法盈亏状况,这可以通过对开仓以来的所有成交单进行计算得到。
但是因为种种原因,客户端策略可能会丢失部分成交单,造成这种结果的原因很多,如网络故障、电脑宕机、
没有在全交易时段运行策略,都会导致部分成交单不能给被记录。
交易所存有完整的成交记录,而且ctp_td_source.function.cpp提供了全面的历史委托查询和历史成交查询函数,
为什么咱们不可以把这些功能多引入一些呢?
vnpy\gateway\da\da_gateway.py(798): self.reqQryTrade(da_req, self.reqid)
vnpy\gateway\tora\td.py(583): err = self._native_api.ReqQryTrade(info, self._get_new_req_id())
vnpy\gateway\uft\uft_gateway.py(489): self.reqQryTrade({}, self.reqid)
在策略中有需要查询或者计算上次完整交易的盈亏数值吗?
陈老师的进阶课中有讲BacktesingEngine是如何计算逐日盯市的,接近需要,
但不知道是否有更加规范的方法,比如从回测引擎或者实盘引擎里就可以查询到?