详细的请见中国人大网2000年12月17日发布的进入证券交易所参与集中竞价交易必须具备什么资格?
由此可以知道:
答案是肯定的,可以。
商品期货:
白天盘品种集合竞价时间是每个交易日08:55--09:00.其中08:55--08:59是指令申报时间,08:59--09:00是指令 撮合时间:
夜盘品种集合竞价时间是每个交易日20:55--21:00.其中20:55--20: 59是指令 申报时间,20:59--21:00是指令 撮合时间,夜盘品种白天不再进行集合竞价。
国债期货:
集合竞价时间是每个交易日09:10--09:15.其中09:10--09: 14是指令申报时间,09:14--09: 15是指令撮合时间。
这里要提醒大家如果您委托单子,但是系统显示的是“没开盘”,那么这是因为您是在非集合竞价报单时间段委托。
作为普通投资者,只要合法连接了CTP接口,您就同时登录了行您选择的经纪商的行情服务器和交易服务器,注意:经纪商的而不是交易所的,也就是您开户并且做了CTP入网认证的那个证券公司的。
行情服务器会实时通知各个合约的交易状态信息。 当您所交易的合约的交易状态为集合竞价状态时,您就可以同手动或者策略自动使用CtpGateway的send_order(),cancel_order()函数,调用交易接口(CtpTdApi)进行委托下单和委托撤单,但是委托下单的结果必须等到集合竞价撮合之后才会得到成交与否。
但是与连续竞价期间的委托不同,在集合竞价撮合结束前,是没有任何成交结果推送给客户端的,而绝大部分策略是依靠行情变化来生成交易信号的,这是最主要的困难!
当然非行情驱动的策略可以在没有行情变化到情况下,自动实现在集合竞价阶段实现委托下单,但是这样做的意义不大。因为由基本面信息、黑天鹅、自然灾害、战争等突发事件等非行情输入驱动的策略,通常是长线策略或者是需要由行情来证实的策略,无需急迫地在集合竞价阶段来参与,为什么不在今天的集合竞价结果出来之后在做打算?
当然,在解决了交易信号生成的情况下,用户策略是可以在集合竞价状态实现自动委托下单的,至于此时如何生成交易信号是另外一个话题了。
MTF wrote:
感谢分享!没看出来这种计算方法的额外信息,这个XMA主要的思路是什么呢?
没看出吗?XMA(x,N),总比MA(x,N) 提前 N//2 个周期,如果N是5,那么提前2天,如果是6提前3天,如果是7提前3天得到均值,虽然不是最终值,但是趋向最终值,它会不断地修正。
leo-2a6111b0fda7498e wrote:
大神,问下 是否可以再回测中, 进行subscribe的方式订阅合约?。我的逻辑上是开盘后计算找到合适的合约在做买入卖出的计算
不可以,因为回测中是不连接CTP网关接口的,它的数据全部来自历史数据,此时是无法大约数据的。
提到通达信函数XMA,人们最常见到的词汇是“未来函数”、“欺骗”、“陷阱”、“坑”......等等不好的字眼,仿佛XMA函数是个捉摸不定的未来函数,是你亏损的根源!
其实大家对这个函数不了解,如果你了解了它的实现机理,它的优点和不得已的缺点,扬长避短,是完全可以使用的。
未来叙述的方便,先指标一个供MA与XMA计算的数组:
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
数据:[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20.]
分析周期N=5
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
MA5 [nan nan nan nan 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18.]
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
XMA5:[ 2. 2.5 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 18.5 19. ]
位置:[ A B C D E F H I J K L M N O P Q R S T U W(21) X(22)]
XMA5:[ nan nan nan nan 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. ]
它们的相同点是均值算法都是一样的,都是N个数的算术平均值,但是均值代表的位置是不同的。
N日MA均值是必须满足N个数据,不足时为nan,其所代表的位置是第N日的均值;
N日XMA均值是必须满足N//2+1个数据,不足时为没有意义,其所代表的位置是第N//2+1日的均值,它的计算是用前N//2个数数据、当前日和后N//2个数数据。当数据个数多于N//2个但不够N数的时候,是有几个数算几个数的均线。
以N=5为XMA例:
MA具有确定性,不会受到下一个数据的变化的影响,而其计算出来的结果一定是滞后的。
从这里我们可以看到,XMA其实中历史数据个数大于N的时候,它的结果能够代表人们通常理解的均值,但是因为它用的数据包含当前位置之后N//2日的数据,因为使用预测的数据,XMA随着计算位置的不同,其计算的数据范围发生了变化。
以下为用python实现XMA函数代码,已经与通达信的自带函数XMA函数做了严格的验证,数据相同的情况下,XMA的均值是完全相同的,也就是说是可靠移植的,可以信赖的!
如果有怀疑,可以自行验证的。
def XMA(src:np.ndarray,N:int,array=False) -> np.ndarray:
""" XMA函数的python实现 """
data_len = len(src)
half_len : int = (N // 2)+(1 if N % 2 else 0)
if data_len < half_len:
out = np.array([np.nan for i in range(data_len)],dtype=float)
if array:
return out
return out[-half_len:] if len(out) else np.array([],dtype=float)
head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])
out = head
if data_len >= N:
body = talib.MA(src,N)[N-1:]
out = np.append(out,body)
tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)])
out = np.append(out,tail)
if array:
return out
return out[-half_len:]
class DynaArrayManager(ArrayManager):
"""
DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。
作者:hxxjava
"""
def __init__(self, size: int = 100) -> None:
super().__init__(size)
self.bar_datetimes:List[datetime] = []
def update_bar(self, bar: BarData) -> None:
if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
self.bar_datetimes.append(bar.datetime)
super().update_bar(bar)
else:
"""
Only Update all arrays in array manager with temporary bar data.
"""
self.open_array[-1] = bar.open_price
self.high_array[-1] = bar.high_price
self.low_array[-1] = bar.low_price
self.close_array[-1] = bar.close_price
self.volume_array[-1] = bar.volume
self.turnover_array[-1] = bar.turnover
self.open_interest_array[-1] = bar.open_interest
# ... ... 其他无关XMA部分略去
def xma(self,select:str="C",N:int=3,array: bool = False) -> np.ndarray:
""" XMA函数的 """
if select.upper() == "C":
src = self.close
elif select.upper() == "O":
src = self.open
elif select.upper() == "H":
src = self.high
elif select.upper() == "L":
src = self.low
else:
assert(False)
data_len = len(src)
half_len : int = (N // 2)+(1 if N % 2 else 0)
if data_len < half_len:
out = np.array([np.nan for i in range(data_len)],dtype=float)
if array:
return out
return out[-half_len:] if len(out) else np.array([],dtype=float)
head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])
out = head
if data_len >= N:
body = talib.MA(src,N)[N-1:]
out = np.append(out,body)
tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)])
out = np.append(out,tail)
if array:
return out
return out[-half_len:]
class XmaItem(CandleItem):
""""""
def __init__(self, manager: BarManager,xma_window:int=10):
""""""
super().__init__(manager)
self.line_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
self.xma_window = xma_window
self.dyna_am = DynaArrayManager(xma_window)
self.xma_data: Dict[int, float] = {}
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_xma_value(self, ix: int) -> float:
""" """
max_ix = self._manager.get_count() - 1
if ix < 0 or ix > max_ix:
return np.nan
# When initialize, calculate all rsi value
if not self.xma_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
sma_array = XMA(np.array(close_data), self.xma_window,array=True)
for n, value in enumerate(sma_array):
self.xma_data[n] = value
# Return if already calcualted
if ix != max_ix and ix in self.xma_data:
return self.xma_data[ix]
if self.dyna_am.inited:
values = self.dyna_am.xma(select='C',N=self.xma_window)
vlen = len(values)
# print(f"vlen={vlen},values={list(values)}")
start_ix = ix-vlen+1
tail_idxs = [(start_ix+i,values[i]) for i in range(vlen)]
for idx,xma in tail_idxs:
self.xma_data[idx] = xma
return self.xma_data[ix]
return np.nan
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""" """
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# Draw XMA Line
xma_value = self._get_xma_value(ix)
last_xma_value = self._get_xma_value(ix - 1)
if not (np.isnan(xma_value) or np.isnan(last_xma_value)):
# Set painter color
painter.setPen(self.line_pen)
# draw XMA line
start_point = QtCore.QPointF(ix-1, last_xma_value)
end_point = QtCore.QPointF(ix, xma_value)
painter.drawLine(start_point, end_point)
# Finish
painter.end()
return picture
def paint(self, painter: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget) -> None:
"""
Reimplement the paint method of parent class.
This function is called by external QGraphicsView.
"""
# return super().paint(painter, opt, w)
rect = opt.exposedRect
half = (self.xma_window // 2)
min_ix: int = int(rect.left()) - half
min_ix: int = max(min_ix,0)
max_ix: int = int(rect.right())
max_ix: int = min(max_ix, len(self._bar_picutures))
rect_area: tuple = (min_ix, max_ix)
if (
self._to_update
or rect_area != self._rect_area
or not self._item_picuture
):
self._to_update = False
self._rect_area = rect_area
# print(f"XmaItem _draw_item_picture:{min_ix}--{max_ix}")
self._draw_item_picture(min_ix, max_ix)
self._item_picuture.play(painter)
def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
"""
Draw the picture of item in specific range.
"""
self._item_picuture = QtGui.QPicture()
painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)
for ix in range(min_ix, max_ix):
bar_picture: QtGui.QPicture = self._bar_picutures[ix]
# if bar_picture is None:
bar: BarData = self._manager.get_bar(ix)
bar_picture = self._draw_bar_picture(ix, bar)
self._bar_picutures[ix] = bar_picture
bar_picture.play(painter)
painter.end()
def get_info_text(self, ix: int) -> str:
""""""
if ix in self.xma_data:
xma_value = self.xma_data[ix]
text = f"XMA({self.xma_window}): {xma_value:.2f}"
else:
text = "XMA({self.xma_window}) -"
return text
尾部不断变化的XMA曲线:
绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......
无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!
临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。
vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:
class DynaArrayManager(ArrayManager):
"""
DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。
作者:hxxjava
"""
def __init__(self, size: int = 100) -> None:
super().__init__(size)
self.bar_datetimes:List[datetime] = []
def update_bar(self, bar: BarData) -> None:
if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
self.bar_datetimes.append(bar.datetime)
super().update_bar(bar)
else:
"""
Only Update all arrays in array manager with temporary bar data.
"""
self.open_array[-1] = bar.open_price
self.high_array[-1] = bar.high_price
self.low_array[-1] = bar.low_price
self.close_array[-1] = bar.close_price
self.volume_array[-1] = bar.volume
self.turnover_array[-1] = bar.turnover
self.open_interest_array[-1] = bar.open_interest
def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
Directional Movement Indicator:
TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
HD := HIGH-REF(HIGH,1); // 创新高量
LD := REF(LOW,1)-LOW; // 创新低量
DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
PDI: DMP*100/TR,LINETHICK2; //做多力量
MDI: DMM*100/TR,LINETHICK2; //做空力量
ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
ADXR:(ADX+REF(ADX,M))/2,LINETHICK2; // ADR:ADX与M日前ADX的均值
"""
TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
# TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
HD = self.high - REF(self.high,1)
LD = REF(self.low,1) - self.low
DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)
PDI = DMP*100/TR;
MDI = DMM*100/TR;
ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M) # ADX:多空力量差值M日平滑
ADXR = (ADX+REF(ADX,M))/2 # ADR:ADX与M日前ADX的均值
if array:
return PDI,MDI,ADX,ADXR
return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]
def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
MACD having three lines:(diff,dea,slow_dea) and macd histgram
"""
diff, dea, macd = talib.MACD(
self.close, fast_period, slow_period, signal_period
)
slow_dea = talib.EMA(dea,signal_period)
if array:
return diff, dea, macd, slow_dea
return diff[-1], dea[-1], macd[-1],slow_dea[-1]
下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。
class Macd3Item(ChartItem):
""" 三根线的MACD """
def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.short_window = short_window
self.long_window = long_window
self.M = M
self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
self.macd_data: Dict[int, List[float,float,float]] = {}
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_macd_value(self, ix: int) -> List:
""""""
max_ix = self._manager.get_count() - 1
invalid_value = [np.nan,np.nan,np.nan,np.nan]
if ix < 0 or ix > max_ix:
return invalid_value
# When initialize, calculate all macd value
if not self.macd_data:
bars:List[BarData] = self._manager.get_all_bars()
close_prices = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_prices),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
slow_deas = talib.EMA(deas,self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]
# Return if already calcualted
if ix != max_ix and ix in self.macd_data:
return self.macd_data[ix]
if self.dyna_am.inited:
diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
self.macd_data[ix] = [diff,dea,macd,slow_dea]
return [diff,dea,macd,slow_dea]
return [np.nan,np.nan,np.nan,np.nan]
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix < 1:
return picture
macd_value = self._get_macd_value(ix)
last_macd_value = self._get_macd_value(ix - 1)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
pass
else:
end_point2 = QtCore.QPointF(ix, macd_value[3])
start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
painter.setPen(self.magetan_pen)
painter.drawLine(start_point2, end_point2)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
# print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
"""
获得4个指标在y轴方向的范围
hxxjava 修改,2022-12-14
当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
"""
if not self.macd_data:
# 如果Ofcd数据没有计算过
return (-100,100)
this_range = (min_ix,max_ix)
if this_range == (None,None):
# 如果是查询全范围
min_ix,max_ix = self._rect_area # 显示索引范围
max_ix = min(max_ix,len(self.macd_data)) - 1 # 数据索引范围
this_range = min_ix,max_ix
if this_range in self._values_ranges:
# 查询范围已经存在,直接返回已经计算过的y范围值
result = self._values_ranges[this_range]
return result
# 查询范围不存在,重新计算y范围值
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
if ndarray.shape[0] == 0:
return (-100,100)
# 求值范围内的的MACD值的最大和最小值
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
if np.isnan(max_price) or np.isnan(max_price):
return (-100,100)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
self._values_ranges[this_range] = result
return result
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
[diff,dea,macd,slow_dea] = self.macd_data[ix]
words = [
f"Macd3{(self.short_window,self.long_window,self.M)}:",
f"diff {diff:.3f}",
f"dea {dea:.3f}",
f"slow_dea={slow_dea:.3f}",
f"macd {macd:.3f}",
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nslow_dea - \nmacd -"
return text
class DmiItem(ChartItem):
""" """
def __init__(self, manager: BarManager,N:int=14,M:int=7):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.N = N
self.M = M
self.dyna_am = DynaArrayManager(2*max(N,M))
self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
""""""
max_ix = self._manager.get_count()-1
invalid_data = (np.nan,np.nan,np.nan,np.nan)
if ix < 0 or ix > max_ix:
print(f"DmiItem{ix},invalid_data1")
return invalid_data
# When initialize, calculate all macd value
if not self.dmi_data:
bars:List[BarData] = self._manager.get_all_bars()
highs = np.array([bar.high_price for bar in bars])
lows = np.array([bar.low_price for bar in bars])
closes = np.array([bar.close_price for bar in bars])
pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)
for n in range(0,len(adx)):
self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])
# Return if already calcualted
if ix != max_ix and ix in self.dmi_data:
return self.dmi_data[ix]
if self.dyna_am.inited:
pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
self.dmi_data[ix] = [pdi,mdi,adx,adxr]
return [pdi,mdi,adx,adxr]
return invalid_data
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix > self.N + self.M:
# 画参考线
painter.setPen(self.ref_pen)
for ref in [20.0,50,80]:
painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))
# 画4根线
dmi_value = self._get_dmi_value(ix)
last_dmi_value = self._get_dmi_value(ix - 1)
pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
for i in range(4):
end_point0 = QtCore.QPointF(ix, dmi_value[i])
start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
painter.setPen(pens[i])
painter.drawLine(start_point0, end_point0)
# 多空颜色标示
pdi,mdi = dmi_value[0],dmi_value[1]
if not(np.isnan(pdi) or np.isnan(mdi)):
if abs(pdi - mdi) > 1e-2:
painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
""" """
return (0.0,100.0)
def get_info_text(self, ix: int) -> str:
""" """
if ix in self.dmi_data:
[pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
words = [
f"DMI{self.N,self.M}:",
f"PDI {pdi:.2f}",
f"MDI {mdi:.2f}",
f"ADX {adx:.2f}",
f"ADXR {adxr:.2f}",
]
text = "\n".join(words)
else:
text = f"DMI{self.N,self.M}:-"
return text
这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!
tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。
tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。
"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ
from vnpy.trader.database import get_database
from vnpy.trader.app import BaseApp
from threading import Thread
EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."
EVENT_TICKSIM_FINISHED = "eTickSimFinished."
TICK_SIM_ENGINE_NAME = "tick_sim"
class TickSimEngine(BaseEngine):
""" tick数据模拟器 """
def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
""" constructor """
super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)
self.ticks:List[TickData] = []
self.db = get_database()
self.speed = 1.0
self._active = False
self._pause = False
self.gateway_name = "CTP"
self.register_event()
print(f"tick数据模拟器已经安装。")
def register_event(self) -> None:
""" """
# self.event_engine.register(EVENT_TICK,self.process_tick_event)
self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event)
self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event)
self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event)
self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)
def load_ticks(self) -> None:
""" """
symbol,exchange = extract_vt_symbol(self.vt_symbol)
self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
print(f"TickSimulator load {len(self.ticks)} ticks")
def process_tick_event(self,event:Event) -> None:
""" """
tick:Dict = event.data
print(f"recieve:{tick}")
def process_ticksim_start_event(self,event:Event) -> None:
""" """
data:Dict = event.data
print(f"I got EVENT_TICKSIM_START:{data}")
self.start_simulator(**data)
def process_ticksim_pause_event(self,event:Event) -> None:
""" """
self._pause = not self._pause
print(f"I got EVENT_TICKSIM_PAUSE:{data}")
def process_ticksim_stop_event(self,event:Event) -> None:
""" """
self._active = True
print(f"I got EVENT_TICKSIM_STOP:{data}")
def process_ticksim_finished(self,event:Event) -> None:
""" """
# 送出收盘交易状态
tick:TickData = event.data
next_second:datetime = tick.datetime + timedelta(seconds=1)
next_second.replace(second=0,microsecond=0)
status = StatusData(
gateway_name=self.gateway_name,
symbol=tick.symbol,
exchange=tick.exchange,
settlement_group_id="100",
instrument_status=InstrumentStatus.CLOSE,
trading_segment_sn = 100,
enter_time = next_second.strftime("%H:%M"),
)
self.event_engine.put(Event(EVENT_STATUS,data=status))
self.stop()
print(f"finish_time:{tick.datetime}")
def start(self) -> None:
""" """
self._active = True
self.thread = Thread(target=self.run)
self.thread.start()
def stop(self) -> None:
""" """
self._active = False
self._pause = False
self.speed = 1.0
self.ticks.clear()
def run(self) -> None:
""" 仿真线程函数 """
while self._active:
# 遍历发送
if not self._pause:
if self.ticks:
tick = self.ticks.pop(0)
tick.gateway_name = self.gateway_name
event = Event(EVENT_TICK,tick)
# print(f"EVENT_TICK event data = {tick}")
self.event_engine.put(event)
if len(self.ticks) == 0:
print(f"all ticks are send out !")
self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))
sleep(0.5/self.speed)
print(f"TickSimulator is stoped !")
def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
""" 启动tick模拟器 """
self.vt_symbol = vt_symbol
self.start_time = start_time
self.end_time = end_time
self.speed = speed
self.gateway_name = gateway_name
self.load_ticks()
self.start()
这里给出最简单的使用方法,具体您可以发挥想象力。
""" """
event_engine = EventEngine()
main_engine = MainEngine(event_engine=event_engine)
main_engine.add_engine(TickSimEngine)
tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
end_time = datetime.now().replace(tzinfo=CHINA_TZ)
start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
tick_sim_engine.start_simulator(
vt_symbol='p2301.DCE', # 回放合约
start_time=start_time, # 开始时间
end_time=end_time, # 结束时间
speed=10.0 # 回放速度
)
data = {
"vt_symbol":'p2301.DCE',
"start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
"end_time": datetime.now().replace(tzinfo=CHINA_TZ),
"speed":10,
"gateway_name":"CTP"
} # 意义同上
event_engine.put(Event(EVENT_TICKSIM_START,data))
event_engine.put(Event(EVENT_TICKSIM_PAUSE))
event_engine.put(Event(EVENT_TICKSIM_STOP))
无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。
逆鳞 wrote:
请问是否需要在object加上StatusData呢?运行的时候说StatusData找不到
寻找我之前关于合约交易状态信息的帖子,一路读下来,就自然能够找到StatusData在
vnpy.trader.object中的定义,在CTP gateway中的接收,分发和各种用途了。
K线图表通常有主图和多个副图组成,它是我们观察合约 K线和指标的工具。但有时往往因为要显示的指标副图太多,去观察效果受到太大的影响。我们可以通过关闭全部或者是一个富途的方式来让显示变得更为简洁和细致。最方便的方式是使用鼠标来进行操作,而很多市面上的第三方软件也多采用此方法。可是看看维恩派的 K线图表因为没有考虑用户使用时候的这个需求,想实现这个功能还是需要费一番心思的。
那么怎么做呢?初步设想是:
修改文件vnpy\chart\widget.py
在引用部分加入下面内容:
from typing import Callable # hxxjava add
PlotItem的扩展类MyPlotItem:
class MyPlotItem(pg.PlotItem):
"""
MyPlotItem:re-implement of PlotItem.
hxxjava add
"""
def __init__(self,name:str,on_mouseDblClicked:Callable=None,*args,**kwargs) -> None:
""" add name attribute and on_mouseDblClicked callback for PlotItem """
super().__init__(*args,**kwargs)
self.name = name
self.on_mouseDblClicked = on_mouseDblClicked
def mouseDoubleClickEvent(self, event: QtGui.QMouseEvent) -> None:
""" re-implement PlotItem's mouseDoubleClickEvent """
super().mouseDoubleClickEvent(event)
if self.on_mouseDblClicked:
self.on_mouseDblClicked(self)
为class ChartWidget添加下面的鼠标双击的下面的回调函数。需要说明的是这里有个约定:容纳主图的PlotItem名称必须为"candle",其他的名称认为是副图,无需纠结是不是太过特别订制了。
def on_mouseDblClicked(self,plot:pg.PlotItem):
"""
所有内含图表被双击的回调函数。 hxxjava add
"""
if plot.name == "candle":
# 选择所有副图
others = [pi for pi in self._plots.values() if pi.name != plot.name]
else:
# 选择其他副图
others = [pi for pi in self._plots.values() if pi.name not in [plot.name,'candle']]
# 求当前幅图的高度变化量
delta_h = 0
for pi in others:
delta_h += pi.height()*(1 if pi.isVisible() else -1)
plot.setFixedHeight(plot.height()+delta_h)
# 隐藏/显示未变双击的图表
for pi in others:
pi.setVisible(not pi.isVisible())
修改class ChartWidget的add_plot()函数,代码如下:
def add_plot(
self,
plot_name: str,
minimum_height: int = 80,
maximum_height: int = None,
hide_x_axis: bool = False
) -> None:
"""
Add plot area.
"""
# Create plot object
# plot: pg.PlotItem = pg.PlotItem(axisItems={'bottom': self._get_new_x_axis()})
plot: pg.PlotItem = MyPlotItem(axisItems={'bottom': self._get_new_x_axis()},name=plot_name,on_mouseDblClicked=self.on_mouseDblClicked)
plot.setMenuEnabled(False)
plot.setClipToView(True)
plot.hideAxis('left')
plot.showAxis('right')
plot.setDownsampling(mode='peak')
plot.setRange(xRange=(0, 1), yRange=(0, 1))
plot.hideButtons()
plot.setMinimumHeight(minimum_height)
if maximum_height:
plot.setMaximumHeight(maximum_height)
if hide_x_axis:
plot.hideAxis("bottom")
if not self._first_plot:
self._first_plot = plot
# Connect view change signal to update y range function
view: pg.ViewBox = plot.getViewBox()
view.sigXRangeChanged.connect(self._update_y_range)
view.setMouseEnabled(x=True, y=True) # hxxjava change,old---view.setMouseEnabled(x=True, y=False)
# Set right axis
right_axis: pg.AxisItem = plot.getAxis('right')
right_axis.setWidth(60)
right_axis.tickFont = NORMAL_FONT
# Connect x-axis link
if self._plots:
first_plot: pg.PlotItem = list(self._plots.values())[0]
plot.setXLink(first_plot)
# Store plot object in dict
self._plots[plot_name] = plot
# Add plot onto the layout
self._layout.nextRow()
self._layout.addItem(plot)
完成了第2部分的代码后,您就可以使用ChartWidget像以往一样创建你的K线图表了。
双击其中的MACD副图后,K线图表窗口MACD副图会占据其他副图的显示区域,再次双击该MACD副图,K线图表窗口中的其他副图就再次显示出来了。
双击其中的的delta副图后,K线图表窗口delta副图会占据其他副图的显示区域,再次双击该delta副图,K线图表窗口中的其他副图就再次显示出来了。
实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:
目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!
为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。
交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。
套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:
在vnpy_ctastrategy\engine.py中增加下面的内容:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
"""
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
def update_setting(self, setting: dict) -> None:
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls) -> dict:
"""
Get default parameters dict of strategy class.
"""
class_parameters: dict = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self) -> dict:
"""
Get strategy parameters dict.
"""
strategy_parameters: dict = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self) -> dict:
"""
Get strategy variables dict.
"""
strategy_variables: dict = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self) -> dict:
"""
Get strategy data.
"""
strategy_data: dict = {
"strategy_name": self.strategy_name,
"vt_symbol": self.vt_symbol,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
@virtual
def on_init(self) -> None:
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_inited(self): # hxxjava add
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self) -> None:
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
@virtual
def on_status(self, status: StatusData=None):
"""
Callback of new status data update. # hxxjava add for trading status
"""
pass
@virtual
def on_tick(self, tick: TickData) -> None:
"""
Callback of new tick data update.
"""
pass
@virtual
def on_bar(self, bar: BarData) -> None:
"""
Callback of new bar data update.
"""
pass
@virtual
def on_trade(self, trade: TradeData) -> None:
"""
Callback of new trade data update.
"""
pass
@virtual
def on_order(self, order: OrderData) -> None:
"""
Callback of new order data update.
"""
pass
@virtual
def on_stop_order(self, stop_order: StopOrder) -> None:
"""
Callback of stop order update.
"""
pass
def buy(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send buy order to open a long position.
"""
return self.send_order(
Direction.LONG,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def sell(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send sell order to close a long position.
"""
return self.send_order(
Direction.SHORT,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def short(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send short order to open as short position.
"""
return self.send_order(
Direction.SHORT,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def cover(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send cover order to close a short position.
"""
return self.send_order(
Direction.LONG,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def send_order(
self,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send a new order.
"""
if self.trading:
vt_orderids: list = self.cta_engine.send_order(
self, direction, offset, price, volume, stop, lock, net
)
return vt_orderids
else:
return []
def cancel_order(self, vt_orderid: str) -> None:
"""
Cancel an existing order.
"""
if self.trading:
self.cta_engine.cancel_order(self, vt_orderid)
def cancel_all(self) -> None:
"""
Cancel all orders sent by strategy.
"""
if self.trading:
self.cta_engine.cancel_all(self)
def write_log(self, msg: str) -> None:
"""
Write a log message.
"""
self.cta_engine.write_log(msg, self)
def get_engine_type(self) -> EngineType:
"""
Return whether the cta_engine is backtesting or live trading.
"""
return self.cta_engine.get_engine_type()
def get_pricetick(self) -> float:
"""
Return pricetick data of trading contract.
"""
return self.cta_engine.get_pricetick(self)
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
) -> None:
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback: Callable = self.on_bar
bars: List[BarData] = self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
for bar in bars:
callback(bar)
def load_tick(self, days: int) -> None:
"""
Load historical tick data for initializing strategy.
"""
ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)
for tick in ticks:
self.on_tick(tick)
def put_event(self) -> None:
"""
Put an strategy data event for ui update.
"""
if self.inited:
self.cta_engine.put_strategy_event(self)
def send_email(self, msg) -> None:
"""
Send email to default receiver.
"""
if self.inited:
self.cta_engine.send_email(msg, self)
def sync_data(self) -> None:
"""
Sync strategy variables value into disk storage.
"""
if self.trading:
self.cta_engine.sync_strategy_data(self)
def get_trading_hours(self):
"""
Return trading_hours of trading hours. # hxxjava add
"""
return self.cta_engine.get_trading_hours(self)
def get_actual_days(self,last_time:datetime,days:int): # hxxjava add
"""
得到从last_time开始往前days天的实际天数。
"""
if days <= 0:
return 0
th = TradingHours(self.get_trading_hours())
# 找到有效的最后时间
till_time = last_time
while not th.get_trade_hours(till_time):
till_time = timedelta(minutes=1)
availble_days = 0
#找到有些多开始时间
from_time = till_time
while availble_days <= days:
from_time -= timedelta(days=1)
if th.get_trade_hours(from_time):
availble_days += 1
actual_days = (last_time-from_time).days
# print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
return actual_days
def get_contract(self):
"""
Return trading_hours of trading contract. # hxxjava add
"""
return self.cta_engine.get_contract(self)
@virtual
def on_condition_order(self, cond_order: ConditionOrder):
"""
Callback of condition order update.
"""
pass
def send_condition_order(self,order:ConditionOrder): # hxxjava add
""" """
if not self.trading:
return False
return self.cta_engine.send_condition_order(order)
def cancel_condition_order(self,cond_orderid:str): # hxxjava add
""" """
return self.cta_engine.cancel_condition_order(cond_orderid)
def cancel_all_condition_orders(self): # hxxjava add
""" """
return self.cta_engine.cancel_all_condition_orders(self)
def send_margin_ratio_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = MarginRequest(symbol=symbol,exchange=exchange)
main_engine.send_margin_ratio_request(req,contract.gateway_name)
def send_commission_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = CommissionRequest(symbol=symbol,exchange=exchange)
main_engine.send_commission_request(req,contract.gateway_name)
修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:
class BarGenerator:
... ...
def update_status(self,status:StatusData=None):
""" """
# if status:
# hh,mm = status.enter_time.split(':')
# st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)
if self.bar:
# 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
self.on_bar(self.bar)
self.bar = None
在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:
class XxxStrategy(CtaTemplate):
""" XXX交易策略 """
self.window = 30
... ...
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
... ...
self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
... ...
def on_inited(self):
"""
Callback after strategy is inited.
"""
self.bg.update_status() # 用来强制生成最后的1分钟K线
self.write_log("策略初始化结束。")
def on_auction_tick(self, tick: TickData):
"""
Callback of auction tick data update.
"""
self.bg.update_auction_tick(tick)
def on_status(self,status:StatusData=None): #
"""
收到合约交易状态信息时,更新所有的K线生成器 。
注意:合约交易状态信息推送只会在策略初始化后才执行!
"""
self.bg.update_status(status=status)
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_xmin_bar(self, bar: BarData):
""" """
# 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
pass
... ...
修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
在ChartWidget基础上创建的指标是不能改变主图或副图的参数的,如何做才能给让指标传递不同参数?
以MacdItem为例,步骤如下:
MacdItem的实现见:为K线图表添砖加瓦——解决MACD绘图部件显示范围BUG,这里就不再贴代码。
def add_item(
self,
item_class: Type[ChartItem],
item_name: str,
plot_name: str,
**kwargs # hxxjava add
) -> None:
"""
Add chart item.
"""
# item: ChartItem = item_class(self._manager)
item: ChartItem = item_class(self._manager) if not kwargs else item_class(self._manager,**kwargs) # hxxjava change
self._items[item_name] = item
plot: pg.PlotItem = self._plots.get(plot_name)
plot.addItem(item)
self._item_plot_map[item] = plot
class MyChartWidget(QtWidgets.QFrame):
"""
订单流K线图表控件,
"""
def __init__(self,title:str="K线图表"):
""""""
super().__init__()
self.init_ui()
self.setWindowTitle(title)
def init_ui(self):
""""""
self.resize(1400, 800)
# Create chart widget
self.chart = ChartWidget()
self.chart.add_plot("candle", hide_x_axis=True)
self.chart.add_plot("fast_macd", maximum_height=100,hide_x_axis=True)
self.chart.add_plot("slow_macd", maximum_height=100,hide_x_axis=True)
self.chart.add_plot("volume", maximum_height=100,hide_x_axis=False)
self.chart.add_item(CandleItem, "candle", "candle")
self.chart.add_item(MacdItem, "fast_macd", "fast_macd",short_window=6,long_window=19,M=9) # 相当于MACD(6,19,9)
self.chart.add_item(MacdItem, "slow_macd", "slow_macd",short_window=19,long_window=39,M=9) # 相当于MACD(19,39,9)
self.chart.add_item(VolumeItem, "volume", "volume")
self.chart.add_cursor()
最近经常有朋友问MacdItem绘图部件显示范围有问题,因为老是有任务在手上,一直没有时间修改,现修改过了,测试没有问题,代码如下:
class MacdItem(ChartItem):
""""""
def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.last_range:Tuple[int, int] = (-1,-1) # 最新显示K线索引范围
self.short_window = short_window
self.long_window = long_window
self.M = M
self.macd_data: Dict[int, List[float,float,float]] = {}
def get_macd_value(self, ix: int) -> List:
""""""
if ix < 0:
return (0.0,0.0,0.0)
# When initialize, calculate all macd value
if not self.macd_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = [diffs[n],deas[n],macds[n]]
# Return if already calcualted
if ix in self.macd_data:
return self.macd_data[ix]
# Else calculate new value
close_data = []
for n in range(ix-self.long_window-self.M+1, ix + 1):
bar = self._manager.get_bar(n)
close_data.append(bar.close_price)
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
diff,dea,macd = diffs[-1],deas[-1],macds[-1]
self.macd_data[ix] = [diff,dea,macd]
return [diff,dea,macd]
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
macd_value = self.get_macd_value(ix)
last_macd_value = self.get_macd_value(ix - 1)
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
"""
获得3个指标在y轴方向的范围
hxxjava 修改,2022-11-15
当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
"""
if not self.macd_data:
# 如果MACD三个数据没有计算过
return (-100,100)
last_range = (min_ix,max_ix)
if last_range == (None,None):
# 本次索引范围未改变
if self.last_range in self._values_ranges:
# 如果y方向范围已经保存,读取y方向范围
result = self._values_ranges[self.last_range]
return adjust_range(result)
else:
# 如果y方向范围没有保存,从macd_data重新计算y方向范围
min_ix,max_ix = 0,len(self.macd_data)-1
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
if result == (np.nan,np.nan):
# 前面的未计算出有效值会出现这种情况
return (-100,100)
self.last_range = (min_ix,max_ix)
self._values_ranges[self.last_range] = result
return adjust_range(result)
""" 以下为显示范围变化时 """
if last_range in self._values_ranges:
# 该范围已经保存过y方向范围
# 取得y方向范围,返回结果
self.last_range = last_range
result = self._values_ranges[last_range]
return adjust_range(result)
# 该范围没有保存过y方向范围,从macd_data重新计算y方向范围
min_ix = max(0,min_ix)
max_ix = min(max_ix,len(self.macd_data)-1)
macd_list = list(self.macd_data.values())[min_ix:max_ix+1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 取得y方向范围,返回结果
result = (min_price, max_price)
if result == (np.nan,np.nan):
# 前面的未计算出有效值会出现这种情况
return (-100,100)
# print(f"result1={result}")
self.last_range = (min_ix,max_ix)
self._values_ranges[self.last_range] = result
self.min_ix,self.max_ix = last_range
return adjust_range(result)
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
[diff,dea,macd] = self.macd_data[ix]
words = [
f"diff {diff:.3f}"," ",
f"dea {dea:.3f}"," ",
f"macd {macd:.3f}",
f"barscount={ix,barscount}"
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nmacd -"
return text
duke wrote:
谢谢大神,问题解决了,另外发现一个BUG,if bar.datetime< self.exit_time: 应该改为if bar.datetime.time() < self.exit_time:
你的修改是错误的,bar.datetime.time()的类型是time类型,self.exit_time的类型也是datetime。
本人在对郑商所行情数据中时间秒以下的部分进行补充的修改一文中,给出来对郑商所行情数据中时间秒以下的部分进行了补充,ctp_gateway也已经按照预想的那样工作了。之后启动DataRecorder对多个合约进行tick数据的录制。可是在价差tick数据的时候却发现对有夜盘的合约进行录制的时间,如果在23:00以后还有tick数据推送的话,会出现日期错误,具体的表现是:2022-10-19 23:00:00以后的tick数据中的datetime字段的日期部分居然是2022-10-20!也就是说我昨天晚上就录到了今天晚上23:00:00以后tick数据(当然是未来时间),这是错误的!
打开mysql数据库(我用的是mysql数据库):
select symbol,datetime from dbtickdata where time(datetime) > "22:59:59.5";
+--------+-------------------------+
| symbol | datetime |
+--------+-------------------------+
| i2301 | 2022-10-19 22:59:59.522 |
| i2301 | 2022-10-19 23:00:00.013 |
| i2301 | 2022-10-19 23:00:00.038 |
| i2301 | 2022-10-20 23:00:00.038 | ---- 日期错误,应该是2022-10-19
| p2301 | 2022-10-19 23:00:00.025 |
| p2301 | 2022-10-20 23:00:00.025 | ---- 日期错误,应该是2022-10-19
| rb2301 | 2022-10-19 22:59:59.500 |
| rb2301 | 2022-10-19 23:00:00.000 |
| TA301 | 2022-10-19 22:59:59.500 |
| TA301 | 2022-10-19 22:59:59.750 |
| TA301 | 2022-10-19 22:59:59.875 |
+--------+-------------------------+
注意:我发现此问题的时间是2022-10-20 16:00!
经过仔细研读ctp_gateway.py的CtpMdApi的onRtnDepthMarketData(),具体的实现见下面的代码,发现对大商所合约tick的datetime错误导致的。
按照ctp接口规范文档知道,大商所合约的深度行情中没有日期字段,需要客户端收到后自行用本地日期去替代。
CtpMdApi中用定时器维护了一个self.current_date的本地日期成员,其主要作用就是做为了补齐大商所合约的深度行情日期和时间之用的。
因为CtpMdApi每次重新连接之后,再次订阅一些合约的行情的时候,必然会推送一条该合约在交易所中最后更新的tick时间给订阅方。只要在每天8:59(不含)之前重新连接大商所的行情服务器,就一定会收到一个日期为当日,时间为该合约最后一次推送的tick,而其中的补足日期是不对的!
请看看上面的证据中,发生日期错误的全部是大商所的合约,分析正确 !
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
方法1. 问题已经找到,怎么保证这个日期一定是正确的,可不是一件简单的事情,因为它牵涉到交易时间段和节假日的计算问题!
方法2. 有一个简单的办法,既然正确日期不好计算,可以对大商所的tick与当前本地时间进行比较,如果tick的datetime是未来时间,直接该tick抛弃掉,这样也不会有太大影响。
打开vnpy_ctpgateway.py文件,对class CtpMdApi进行如下修改,具体的修改请查找 '# hxxjava'就可以找到修改的语句了。
1、CtpMdApi中增加self.current_time,用来记录本地时间
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: Set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
# self.current_date: str = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time: datetime = datetime.now() # hxxjava adds
self.current_date: str = self.current_time.strftime("%Y%m%d") # hxxjava adds
self.czce_last_times:Dict[str,datetime] = {} # hxxjava add
2、修改CtpMdApi的update_date()函数
def update_date(self) -> None:
"""更新当前日期"""
# self.current_date = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time = datetime.now() # hxxjava adds
self.current_date = self.current_time.strftime("%Y%m%d") # hxxjava adds
3、修改CtpMdApi的深度行情推送函数onRtnDepthMarketData()
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
elif contract.exchange == Exchange.DCE:
# 对郑商所配置的秒以下的部分进行特别处理
if dt > CHINA_TZ.localize(self.current_time) + timedelta(seconds = 10):
# 如果大商所的补足本地日期的dt超前每2秒更新一次的本地时间2秒,判断为无数的行情数据,做丢弃处理。
# 但是为了降低对本地时间与交易所时间同步要求,放宽为10秒或者更多都可以
return
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
Teller wrote:
感谢大佬回复!
大佬,修改后我遇到这样的报错,是不是我少import了什么包?
我自己尝试import了pytz包,但是没成功。
答复:
ctp_gateway.py的引用部分有
import pytz
下面是常量定义:
CHINA_TZ = pytz.timezone("Asia/Shanghai") # 中国时区
然后就可以使用了,目的是把CTP接口中的所有行情、委托单和成绩单这涉及到时间的字段都同一到中国时区。
难道你的ctp_gateway.py文件中没有这些内容?
Teller wrote:
感谢大佬分享!
我想按您说的修改一下代码,但发现有一行代码不同,用原代码还是用您的代码呀,还是说两行代码是一个意思。
我学python没多久,很多地方看不懂,在这里跪谢大佬。
答复:
去掉前面的,保留后一句。
守望长城2020-6-11-艾瑞巴蒂 wrote:
大佬, 还有一个问题, K线图运行久了会报错, 我绘制的是Tick 图, 报错如下, 不知道咋解决
看了你的vnpy的目录构成,貌似您用的版本还是很久以前的老版本了,至少不是最新版本。
"H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\backtesting.py"
如果是新的版本,应该是这样的:
"H:\vnstudio\lib\site-packages\vnpy_ctastrategy\backtesting.py"
由此推断你的pyqtgraph的版本也是老的版本。
随着vnpy的版本升级,pyqtgraph的版本也会采用新的版本,你升级到3.0及以后版本再试试看。
守望长城2020-6-11-艾瑞巴蒂 wrote:
wangjiancc wrote:
H:\vnstudio\python.exe H:/vnstudio/Lib/site-packages/vnpy/app/ctastrategy/backtesting/run.py
2021-03-03 22:22:48,092 INFO: 注册日志事件监听
Exception in thread Thread-4:
Traceback (most recent call last):
File "H:\vnstudio\lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "H:\vnstudio\lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "H:\vnstudio\Lib\site-packages\vnpy\app\cta_backtester\engine.py", line 168, in run_backtesting
setting
File "H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\backtesting.py", line 211, in add_strategy
self, strategy_class.name, self.vt_symbol, setting
File "H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\strategies\KxMonitor.py", line 82, in init
self.even_engine = cta_engine.main_engine.event_engine
AttributeError: 'BacktestingEngine' object has no attribute 'main_engine'Process finished with exit code 0
大佬,请问下这是什么错误?
看报错就是你没有传main_engine进去啊
策略中可以考虑回测的情况,另外回测是不允许显示K线图的,如果非要这么干,在策略中要判断是否运行在实盘或者回测环境,这可以通过获得engine的类型是否为
if self.engine.engine_type == EngineType.BACKTESTING:
xxxxxx 放弃窗口K线图表
else:
xxxxxx 创建窗口K线图表
的方法来进行规避错误的发生。
在使用vnpy中的DataRecorder进行tick情录制的时候,本人发现郑商所的tick中时间在一秒内会收到两个tick,但是同一秒内的tick的时间是相同的。这会给后端的CTA策略、价差交易策略登录模块在合成K的时带来不必要的困扰,因为通常合成K线的BarGenerator通常对tick的时间进行有效性判断的时候,会把时间重复的tick做抛弃处理,这会再次郑商所的品种的tick被使用的量减半。如果重复的tick也用的话,这样倒是不会减半,可是在防止垃圾数据时又无法杜绝旧的数据在网关断网或者重新再次连接时,接口回再次推送最后的tick数据,而这个tick数据之前已经被实时推送给客户段了。所有在ctp_gateway中对郑商所行情数据中时间秒以下的部分进行补充是非常必要的!
本修改只需要对vnpy_ctp\ctp_gateway.py进行修改就可以了,修改步骤如下:
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: Set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.now().strftime("%Y%m%d")
self.czce_last_times:Dict[str,datetime] = {} # 郑商所最新tick时间字典 hxxjava add
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
这是本人录制的郑商所合约TA301的结果:
都说国内期货的行情推送最多是2次,其实是错误的,可能超过2次 !看上图就知道,一秒3次一个经常存在的,所以不能够固定递增0.5秒,而应该向本文中的办法,每次加0.5,0.25,0.125......秒的比较好,既不重复有可以超过2次。