VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

1、参与集中竞价的资格问题

详细的请见中国人大网2000年12月17日发布的进入证券交易所参与集中竞价交易必须具备什么资格?

由此可以知道:

  • 所谓集中竞价是指在交易所的证券买卖的有形市场内,普通投资者无权直接进人交易所,只能委托在交易所里拥有席位的经纪人或者证券公司代理买卖。
  • 进人证券交易所参与集中竞价交易的,必须是具有证券交易所会员资格的证券公司。也就是说,只有具备证券交易所会员资格的证券公司,才可以进人证券交易所进行证券交易。

2 、普通投资者期货集合竞价时间集合竞价时间可以委托下单吗?

答案是肯定的,可以。

2.1 合竞价时间

  1. 期货集合竞价时间
    集合竞价时间是每个交易09:25--09:30..其8:25-09:92是指令申报时间,09:29--09:30是指令撮合时间
  2. 商品期货:
    白天盘品种集合竞价时间是每个交易日08:55--09:00.其中08:55--08:59是指令申报时间,08:59--09:00是指令 撮合时间:
    夜盘品种集合竞价时间是每个交易日20:55--21:00.其中20:55--20: 59是指令 申报时间,20:59--21:00是指令 撮合时间,夜盘品种白天不再进行集合竞价。

  3. 国债期货:
    集合竞价时间是每个交易日09:10--09:15.其中09:10--09: 14是指令申报时间,09:14--09: 15是指令撮合时间。
    这里要提醒大家如果您委托单子,但是系统显示的是“没开盘”,那么这是因为您是在非集合竞价报单时间段委托。

3、vnpy中如何实现集合竞价委托

作为普通投资者,只要合法连接了CTP接口,您就同时登录了行您选择的经纪商的行情服务器和交易服务器,注意:经纪商的而不是交易所的,也就是您开户并且做了CTP入网认证的那个证券公司的。
行情服务器会实时通知各个合约的交易状态信息。 当您所交易的合约的交易状态为集合竞价状态时,您就可以同手动或者策略自动使用CtpGateway的send_order(),cancel_order()函数,调用交易接口(CtpTdApi)进行委托下单和委托撤单,但是委托下单的结果必须等到集合竞价撮合之后才会得到成交与否。
但是与连续竞价期间的委托不同,在集合竞价撮合结束前,是没有任何成交结果推送给客户端的,而绝大部分策略是依靠行情变化来生成交易信号的,这是最主要的困难!
当然非行情驱动的策略可以在没有行情变化到情况下,自动实现在集合竞价阶段实现委托下单,但是这样做的意义不大。因为由基本面信息、黑天鹅、自然灾害、战争等突发事件等非行情输入驱动的策略,通常是长线策略或者是需要由行情来证实的策略,无需急迫地在集合竞价阶段来参与,为什么不在今天的集合竞价结果出来之后在做打算?

当然,在解决了交易信号生成的情况下,用户策略是可以在集合竞价状态实现自动委托下单的,至于此时如何生成交易信号是另外一个话题了。

MTF wrote:

感谢分享!没看出来这种计算方法的额外信息,这个XMA主要的思路是什么呢?

没看出吗?XMA(x,N),总比MA(x,N) 提前 N//2 个周期,如果N是5,那么提前2天,如果是6提前3天,如果是7提前3天得到均值,虽然不是最终值,但是趋向最终值,它会不断地修正。

leo-2a6111b0fda7498e wrote:

大神,问下 是否可以再回测中, 进行subscribe的方式订阅合约?。我的逻辑上是开盘后计算找到合适的合约在做买入卖出的计算

不可以,因为回测中是不连接CTP网关接口的,它的数据全部来自历史数据,此时是无法大约数据的。

1. 被冤枉的通达信函数“未来函数”XMA

提到通达信函数XMA,人们最常见到的词汇是“未来函数”、“欺骗”、“陷阱”、“坑”......等等不好的字眼,仿佛XMA函数是个捉摸不定的未来函数,是你亏损的根源!
其实大家对这个函数不了解,如果你了解了它的实现机理,它的优点和不得已的缺点,扬长避短,是完全可以使用的。

1.1 MA与XMA的区别

未来叙述的方便,先指标一个供MA与XMA计算的数组:
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
数据:[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20.]
分析周期N=5

1.1.1 滞后的N日MA均价

位置:[ A B C D E F H I J K L M N O P Q R S T U ]
MA5 [nan nan nan nan 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18.]

  • A = nan
  • B = nan
  • C = nan
  • D = nan
  • E = (1+2+3+4+5)/5
  • F = (2+3+4+5+6)/5
  • G = (3+4+5+6+7)/5
  • ... ...
  • U = (16+17+18+19+20)/5

1.1.2 XMA的N日均价

位置:[ A B C D E F H I J K L M N O P Q R S T U ]
XMA5:[ 2. 2.5 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 18.5 19. ]

  • A = (1+2+3)/3=2
  • B = (1+2+3+4)/4=2.5
  • C = (1+2+3+4+5)/5=3
  • D = (2+3+4+5+6)/5=4
  • E = (3+4+5+6+7)/5=5
  • F = (4+5+6+7+8)/5=6
  • ... ...
  • S = (16+17+18+19+20)/5
  • T = (17+18+19+20)/4
  • U = (18+19+20)/3

1.1.3 人们期望的N日均价

位置:[ A B C D E F H I J K L M N O P Q R S T U W(21) X(22)]
XMA5:[ nan nan nan nan 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. ]

1.2 MA与XMA的相同与区别

它们的相同点是均值算法都是一样的,都是N个数的算术平均值,但是均值代表的位置是不同的。
N日MA均值是必须满足N个数据,不足时为nan,其所代表的位置是第N日的均值;
N日XMA均值是必须满足N//2+1个数据,不足时为没有意义,其所代表的位置是第N//2+1日的均值,它的计算是用前N//2个数数据、当前日和后N//2个数数据。当数据个数多于N//2个但不够N数的时候,是有几个数算几个数的均线。
以N=5为XMA例:

  • A位置时它取A,B,C位置的数,求3个算术均值,
  • 位置时它取A,B,C,D位置的数,求4个算术均值,
  • 到了C位置时它取A,B,C,D,E位置的数,求5个数的算术均值。
  • 到了S位置时它取Q、R、S、T、U位置的数,求5个数的算术均值。
  • 到了T位置时它取R、S、T、U位置的数,求4个数的算术均值。
  • 到了U位置时它取S、T、U位置的数,求3个数的算术均值。

MA具有确定性,不会受到下一个数据的变化的影响,而其计算出来的结果一定是滞后的。
从这里我们可以看到,XMA其实中历史数据个数大于N的时候,它的结果能够代表人们通常理解的均值,但是因为它用的数据包含当前位置之后N//2日的数据,因为使用预测的数据,XMA随着计算位置的不同,其计算的数据范围发生了变化。

注意:XMA并没有用到预测的数据,它只是在不满足N个计算数据时缩短了N!随着新的数据的到来,它后面N//2个数是会发生变化的,XMA明确而且简单,不神秘!

2. XMA的python实现

以下为用python实现XMA函数代码,已经与通达信的自带函数XMA函数做了严格的验证,数据相同的情况下,XMA的均值是完全相同的,也就是说是可靠移植的,可以信赖的!
如果有怀疑,可以自行验证的。

2.1 XMA()函数

def XMA(src:np.ndarray,N:int,array=False) -> np.ndarray:
    """ XMA函数的python实现 """
    data_len = len(src)
    half_len : int = (N // 2)+(1 if N % 2 else 0)
    if data_len < half_len:
        out = np.array([np.nan for i in range(data_len)],dtype=float)
        if array:
            return out
        return out[-half_len:] if len(out) else np.array([],dtype=float)

    head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])  
    out = head
    if data_len >= N:
        body = talib.MA(src,N)[N-1:]  
        out = np.append(out,body)
        tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)]) 
        out = np.append(out,tail)

    if array:
        return out

    return out[-half_len:]

2.2 为vnpy的数组管理器增加xma()成员函数


class DynaArrayManager(ArrayManager):
    """ 
    DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。 
    作者:hxxjava
    """
    def __init__(self, size: int = 100) -> None:
        super().__init__(size)

        self.bar_datetimes:List[datetime] = []

    def update_bar(self, bar: BarData) -> None:
        if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
            self.bar_datetimes.append(bar.datetime)
            super().update_bar(bar)        

        else:
            """
            Only Update all arrays in array manager with temporary bar data.
            """
            self.open_array[-1] = bar.open_price
            self.high_array[-1] = bar.high_price
            self.low_array[-1] = bar.low_price
            self.close_array[-1] = bar.close_price
            self.volume_array[-1] = bar.volume
            self.turnover_array[-1] = bar.turnover
            self.open_interest_array[-1] = bar.open_interest

    # ... ... 其他无关XMA部分略去

    def xma(self,select:str="C",N:int=3,array: bool = False) -> np.ndarray:
        """ XMA函数的 """
        if   select.upper() == "C":
            src = self.close
        elif select.upper() == "O":
            src = self.open
        elif select.upper() == "H":
            src = self.high
        elif select.upper() == "L":
            src = self.low
        else:
            assert(False) 

        data_len = len(src)
        half_len : int = (N // 2)+(1 if N % 2 else 0)
        if data_len < half_len:
            out = np.array([np.nan for i in range(data_len)],dtype=float)
            if array:
                return out
            return out[-half_len:] if len(out) else np.array([],dtype=float)

        head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])  
        out = head
        if data_len >= N:
            body = talib.MA(src,N)[N-1:]  
            out = np.append(out,body)
            tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)]) 
            out = np.append(out,tail)

        if array:
            return out

        return out[-half_len:]

3. 在vnpy实现显示XMA的指标组件XmaItem

3.1 XmaItem指标组件实现的难点

可以这么说:如果会写XMA的曲线的vnpy显示组件,已经可以写作任何vnpy显示组件了,它不是一般的难!

  1. 考虑到XMA的后N//2的均值,会随随着新的数据的到来,后N//2个数是会发生变化的特点,XmaItem需要在每次计算新的XMA的值之后,会得到N//2+1个均值结果。我们需要将这个N//2+1个均值替换之前的XMA结果值。
  2. 还是因为后N//2个数是会不停发生变化,XmaItem指标显示组件,需要不同刷新后N//2+1个K线的显示曲线。否则会看到一个断裂带尾部曲线。

3.2 XmaItem指标显示组件的实现代码

class XmaItem(CandleItem):
    """"""
    def __init__(self, manager: BarManager,xma_window:int=10):
        """"""
        super().__init__(manager)

        self.line_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.xma_window = xma_window
        self.dyna_am = DynaArrayManager(xma_window)
        self.xma_data: Dict[int, float] = {}

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_xma_value(self, ix: int) -> float:
        """ """
        max_ix = self._manager.get_count() - 1
        if ix < 0 or ix > max_ix:
            return np.nan

        # When initialize, calculate all rsi value
        if not self.xma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = XMA(np.array(close_data), self.xma_window,array=True)

            for n, value in enumerate(sma_array):
                self.xma_data[n] = value

        # Return if already calcualted

        if ix != max_ix and ix in self.xma_data:
            return self.xma_data[ix]

        if self.dyna_am.inited:
            values = self.dyna_am.xma(select='C',N=self.xma_window)
            vlen = len(values)
            # print(f"vlen={vlen},values={list(values)}")
            start_ix = ix-vlen+1
            tail_idxs = [(start_ix+i,values[i]) for i in range(vlen)]
            for idx,xma in tail_idxs:
                self.xma_data[idx] = xma

            return self.xma_data[ix]

        return np.nan

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """ """
        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw XMA Line
        xma_value = self._get_xma_value(ix)
        last_xma_value = self._get_xma_value(ix - 1)

        if not (np.isnan(xma_value) or np.isnan(last_xma_value)):
            # Set painter color
            painter.setPen(self.line_pen)
            # draw XMA line
            start_point = QtCore.QPointF(ix-1, last_xma_value)
            end_point = QtCore.QPointF(ix, xma_value)
            painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def paint(self, painter: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget) -> None:
        """
        Reimplement the paint method of parent class.
        This function is called by external QGraphicsView.
        """
        # return super().paint(painter, opt, w)
        rect = opt.exposedRect
        half = (self.xma_window // 2)
        min_ix: int = int(rect.left()) - half 
        min_ix: int = max(min_ix,0)
        max_ix: int = int(rect.right())
        max_ix: int = min(max_ix, len(self._bar_picutures))

        rect_area: tuple = (min_ix, max_ix)
        if (
            self._to_update
            or rect_area != self._rect_area
            or not self._item_picuture
        ):
            self._to_update = False
            self._rect_area = rect_area
            # print(f"XmaItem _draw_item_picture:{min_ix}--{max_ix}")
            self._draw_item_picture(min_ix, max_ix)

        self._item_picuture.play(painter)

    def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
        """
        Draw the picture of item in specific range.
        """
        self._item_picuture = QtGui.QPicture()
        painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)

        for ix in range(min_ix, max_ix):
            bar_picture: QtGui.QPicture = self._bar_picutures[ix]

            # if bar_picture is None:
            bar: BarData = self._manager.get_bar(ix)
            bar_picture = self._draw_bar_picture(ix, bar)
            self._bar_picutures[ix] = bar_picture

            bar_picture.play(painter)

        painter.end()

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.xma_data:
            xma_value = self.xma_data[ix]
            text = f"XMA({self.xma_window}): {xma_value:.2f}"
        else:
            text = "XMA({self.xma_window}) -"

        return text

3.3 XMA指标显示的效果

尾部不断变化的XMA曲线:
description

绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......

1. 计算临时K线指标与历史K线指标有什么不同?

无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!

2. 如何提升计算临时K线指标的速度

临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。

3. 实现方法

3.1 扩展vnpy的ArrayManager成为数组数字管理器

vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:

class DynaArrayManager(ArrayManager):
    """ 
    DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。 
    作者:hxxjava
    """
    def __init__(self, size: int = 100) -> None:
        super().__init__(size)

        self.bar_datetimes:List[datetime] = []

    def update_bar(self, bar: BarData) -> None:
        if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
            self.bar_datetimes.append(bar.datetime)
            super().update_bar(bar)        

        else:
            """
            Only Update all arrays in array manager with temporary bar data.
            """
            self.open_array[-1] = bar.open_price
            self.high_array[-1] = bar.high_price
            self.low_array[-1] = bar.low_price
            self.close_array[-1] = bar.close_price
            self.volume_array[-1] = bar.volume
            self.turnover_array[-1] = bar.turnover
            self.open_interest_array[-1] = bar.open_interest

    def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """ 
        Directional Movement Indicator:
        TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
        HD := HIGH-REF(HIGH,1); // 创新高量
        LD := REF(LOW,1)-LOW;   // 创新低量
        DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
        DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
        PDI: DMP*100/TR,LINETHICK2; //做多力量
        MDI: DMM*100/TR,LINETHICK2; //做空力量
        ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
        ADXR:(ADX+REF(ADX,M))/2,LINETHICK2;  // ADR:ADX与M日前ADX的均值        
        """
        TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
        # TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
        HD = self.high - REF(self.high,1)
        LD = REF(self.low,1) - self.low 

        DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
        DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)

        PDI = DMP*100/TR;
        MDI = DMM*100/TR;

        ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M)  # ADX:多空力量差值M日平滑
        ADXR = (ADX+REF(ADX,M))/2                         # ADR:ADX与M日前ADX的均值     

        if array:
            return PDI,MDI,ADX,ADXR

        return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]

    def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """
        MACD having three lines:(diff,dea,slow_dea) and macd histgram
        """
        diff, dea, macd = talib.MACD(
            self.close, fast_period, slow_period, signal_period
        )
        slow_dea = talib.EMA(dea,signal_period)

        if array:
            return diff, dea, macd, slow_dea
        return diff[-1], dea[-1], macd[-1],slow_dea[-1]

3.2 改变CandleItem和ChartItem的实现机制

下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。

3.2.1 MacdItem的修改方法

class Macd3Item(ChartItem):
    """ 三根线的MACD """
    def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.short_window = short_window
        self.long_window = long_window
        self.M = M

        self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
        self.macd_data: Dict[int, List[float,float,float]] = {}

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_macd_value(self, ix: int) -> List:
        """"""
        max_ix = self._manager.get_count() - 1
        invalid_value = [np.nan,np.nan,np.nan,np.nan]
        if ix < 0 or ix > max_ix:
            return invalid_value

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars:List[BarData] = self._manager.get_all_bars()
            close_prices = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_prices), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)
            slow_deas = talib.EMA(deas,self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]

        # Return if already calcualted
        if ix != max_ix and ix in self.macd_data:
            return self.macd_data[ix]

        if self.dyna_am.inited:
            diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
            self.macd_data[ix] = [diff,dea,macd,slow_dea]
            return [diff,dea,macd,slow_dea]

        return [np.nan,np.nan,np.nan,np.nan]

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix < 1:
            return picture

        macd_value = self._get_macd_value(ix)
        last_macd_value = self._get_macd_value(ix - 1)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
            pass
        else:
            end_point2 = QtCore.QPointF(ix, macd_value[3])
            start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
            painter.setPen(self.magetan_pen)
            painter.drawLine(start_point2, end_point2)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        # print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """ 
        获得4个指标在y轴方向的范围 
        hxxjava 修改,2022-12-14
        当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
        """  
        if not self.macd_data:
            # 如果Ofcd数据没有计算过
            return (-100,100)

        this_range = (min_ix,max_ix)       
        if this_range == (None,None):
            # 如果是查询全范围
            min_ix,max_ix = self._rect_area                 # 显示索引范围
            max_ix = min(max_ix,len(self.macd_data)) - 1    # 数据索引范围
            this_range = min_ix,max_ix

        if this_range in self._values_ranges:
            # 查询范围已经存在,直接返回已经计算过的y范围值
            result = self._values_ranges[this_range]
            return result

        # 查询范围不存在,重新计算y范围值
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list)  
        if ndarray.shape[0] == 0:
            return (-100,100)

        # 求值范围内的的MACD值的最大和最小值
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)
        if np.isnan(max_price) or np.isnan(max_price):
            return (-100,100)

        # 保存y方向范围,同时返回结果
        result = (min_price, max_price)
        self._values_ranges[this_range] = result
        return result

    def get_info_text(self, ix: int) -> str:
        """ """
        barscount = len(self._manager._bars) # hxxjava debug
        if ix in self.macd_data:
            [diff,dea,macd,slow_dea] = self.macd_data[ix]
            words = [
                f"Macd3{(self.short_window,self.long_window,self.M)}:",
                f"diff {diff:.3f}",
                f"dea {dea:.3f}",
                f"slow_dea={slow_dea:.3f}",
                f"macd {macd:.3f}",
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nslow_dea - \nmacd -"

        return text

3.2.1 DmiItem的修改方法

class DmiItem(ChartItem): 
    """  """
    def __init__(self, manager: BarManager,N:int=14,M:int=7):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.N = N
        self.M = M

        self.dyna_am = DynaArrayManager(2*max(N,M))

        self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
        """"""
        max_ix = self._manager.get_count()-1
        invalid_data = (np.nan,np.nan,np.nan,np.nan)
        if ix < 0 or ix > max_ix:
            print(f"DmiItem{ix},invalid_data1")
            return invalid_data

        # When initialize, calculate all macd value
        if not self.dmi_data:
            bars:List[BarData] = self._manager.get_all_bars()
            highs = np.array([bar.high_price for bar in bars])
            lows = np.array([bar.low_price for bar in bars])
            closes = np.array([bar.close_price for bar in bars])

            pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)

            for n in range(0,len(adx)):
                self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])


        # Return if already calcualted
        if ix != max_ix and ix in self.dmi_data:
            return self.dmi_data[ix]

        if self.dyna_am.inited:
            pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
            self.dmi_data[ix] = [pdi,mdi,adx,adxr]
            return [pdi,mdi,adx,adxr]

        return invalid_data

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix > self.N + self.M:
            # 画参考线
            painter.setPen(self.ref_pen)
            for ref in [20.0,50,80]:
                painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))  

            # 画4根线
            dmi_value = self._get_dmi_value(ix)
            last_dmi_value = self._get_dmi_value(ix - 1)
            pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
            for i in range(4):
                end_point0 = QtCore.QPointF(ix, dmi_value[i])
                start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
                painter.setPen(pens[i])
                painter.drawLine(start_point0, end_point0)        

            # 多空颜色标示
            pdi,mdi = dmi_value[0],dmi_value[1] 
            if not(np.isnan(pdi) or np.isnan(mdi)):
                if abs(pdi - mdi) > 1e-2:
                    painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
                    painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))  

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return (0.0,100.0)  

    def get_info_text(self, ix: int) -> str:
        """ """
        if ix in self.dmi_data:
            [pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
            words = [
                f"DMI{self.N,self.M}:",
                f"PDI {pdi:.2f}",
                f"MDI {mdi:.2f}",
                f"ADX {adx:.2f}",
                f"ADXR {adxr:.2f}",
                ]
            text = "\n".join(words)
        else:
            text = f"DMI{self.N,self.M}:-"

        return text

4. 快了,果然快了

这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!

1. tick数据模拟器

tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。

1.1 功能:

  • 可以启动、暂停、恢复和停止回放tick数据。
  • 速度可调,根据需要决定合适的速度
  • 模拟的接口选,默认CTP
  • 时间范围可变

1.2 tick数据来源

tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。

2. tick数据模拟器的实现

"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep 
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ

from vnpy.trader.database import get_database

from vnpy.trader.app import BaseApp
from threading import Thread

EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."

EVENT_TICKSIM_FINISHED = "eTickSimFinished."

TICK_SIM_ENGINE_NAME = "tick_sim"

class TickSimEngine(BaseEngine):
    """ tick数据模拟器 """
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
        """ constructor """
        super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)

        self.ticks:List[TickData] = []

        self.db = get_database()
        self.speed = 1.0
        self._active = False
        self._pause = False
        self.gateway_name = "CTP"

        self.register_event()

        print(f"tick数据模拟器已经安装。")

    def register_event(self) -> None:
        """ """
        # self.event_engine.register(EVENT_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event) 
        self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event) 
        self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event) 

        self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)     

    def load_ticks(self) -> None:
        """  """
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
        print(f"TickSimulator load {len(self.ticks)} ticks")

    def process_tick_event(self,event:Event) -> None:
        """ """
        tick:Dict = event.data
        print(f"recieve:{tick}")

    def process_ticksim_start_event(self,event:Event) -> None:
        """ """
        data:Dict = event.data
        print(f"I got EVENT_TICKSIM_START:{data}")
        self.start_simulator(**data)

    def process_ticksim_pause_event(self,event:Event) -> None:
        """ """
        self._pause = not self._pause
        print(f"I got EVENT_TICKSIM_PAUSE:{data}")

    def process_ticksim_stop_event(self,event:Event) -> None:
        """ """
        self._active = True
        print(f"I got EVENT_TICKSIM_STOP:{data}")

    def process_ticksim_finished(self,event:Event) -> None:
        """ """
        # 送出收盘交易状态
        tick:TickData = event.data

        next_second:datetime = tick.datetime + timedelta(seconds=1)
        next_second.replace(second=0,microsecond=0)
        status = StatusData(
            gateway_name=self.gateway_name,
            symbol=tick.symbol,
            exchange=tick.exchange,
            settlement_group_id="100",
            instrument_status=InstrumentStatus.CLOSE,
            trading_segment_sn = 100,
            enter_time = next_second.strftime("%H:%M"),
        )
        self.event_engine.put(Event(EVENT_STATUS,data=status))

        self.stop()

        print(f"finish_time:{tick.datetime}")

    def start(self) -> None:
        """ """
        self._active = True
        self.thread = Thread(target=self.run)
        self.thread.start()

    def stop(self) -> None:
        """ """
        self._active = False
        self._pause = False
        self.speed = 1.0
        self.ticks.clear()

    def run(self) -> None:
        """ 仿真线程函数 """
        while self._active:
            # 遍历发送
            if not self._pause:
                if self.ticks:
                    tick = self.ticks.pop(0)
                    tick.gateway_name = self.gateway_name

                    event = Event(EVENT_TICK,tick)

                    # print(f"EVENT_TICK event data = {tick}")
                    self.event_engine.put(event)

                    if len(self.ticks) == 0:
                        print(f"all ticks are send out !")
                        self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))

            sleep(0.5/self.speed)

        print(f"TickSimulator is stoped !")

    def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
        """ 启动tick模拟器 """
        self.vt_symbol = vt_symbol
        self.start_time = start_time
        self.end_time = end_time
        self.speed = speed
        self.gateway_name = gateway_name
        self.load_ticks()
        self.start()

3. tick数据模拟器使用

这里给出最简单的使用方法,具体您可以发挥想象力。

3.1 加载tick数据模器到MainEgine

    """ """
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine=event_engine)

    main_engine.add_engine(TickSimEngine)

3.2 启动回放方法1

    tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
    end_time = datetime.now().replace(tzinfo=CHINA_TZ)
    start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
    tick_sim_engine.start_simulator(
             vt_symbol='p2301.DCE',  # 回放合约
             start_time=start_time,      # 开始时间
             end_time=end_time,        # 结束时间
             speed=10.0                      # 回放速度
    )

3.3 启动回放方法2

    data = {
        "vt_symbol":'p2301.DCE',                   
        "start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
        "end_time": datetime.now().replace(tzinfo=CHINA_TZ),
        "speed":10,
        "gateway_name":"CTP"
    }  # 意义同上
    event_engine.put(Event(EVENT_TICKSIM_START,data))

3.4 暂停回放方法

    event_engine.put(Event(EVENT_TICKSIM_PAUSE))

3.5 停止回放方法

    event_engine.put(Event(EVENT_TICKSIM_STOP))

注释:

无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。

逆鳞 wrote:

请问是否需要在object加上StatusData呢?运行的时候说StatusData找不到

寻找我之前关于合约交易状态信息的帖子,一路读下来,就自然能够找到StatusData在
vnpy.trader.object中的定义,在CTP gateway中的接收,分发和各种用途了。

1. 鼠标双击隐藏/显示K线图表全部或部分副图的好处

K线图表通常有主图和多个副图组成,它是我们观察合约 K线和指标的工具。但有时往往因为要显示的指标副图太多,去观察效果受到太大的影响。我们可以通过关闭全部或者是一个富途的方式来让显示变得更为简洁和细致。最方便的方式是使用鼠标来进行操作,而很多市面上的第三方软件也多采用此方法。可是看看维恩派的 K线图表因为没有考虑用户使用时候的这个需求,想实现这个功能还是需要费一番心思的。

那么怎么做呢?初步设想是:

  • 当用户用鼠标双击主图时,把所有的附图全部关闭;当用户再次双击主图时,把所有的富途再次显示出来。
  • 当用户用鼠标双击其中一个副图时,将其余副图全部隐藏起来,当用户再次双击该副图时,再把之前隐藏的其余副图显示出来。

2. 具体的实现方法:

修改文件vnpy\chart\widget.py

2.1 扩展PlotItem使之可以捕获鼠标双击事件

在引用部分加入下面内容:

from typing import Callable # hxxjava add

PlotItem的扩展类MyPlotItem:

class MyPlotItem(pg.PlotItem):
    """ 
    MyPlotItem:re-implement of PlotItem.
    hxxjava add 
    """
    def __init__(self,name:str,on_mouseDblClicked:Callable=None,*args,**kwargs) -> None:
        """ add name attribute and on_mouseDblClicked callback for PlotItem """
        super().__init__(*args,**kwargs)

        self.name = name
        self.on_mouseDblClicked = on_mouseDblClicked

    def mouseDoubleClickEvent(self, event: QtGui.QMouseEvent) -> None:
        """ re-implement PlotItem's mouseDoubleClickEvent """
        super().mouseDoubleClickEvent(event)

        if self.on_mouseDblClicked:
            self.on_mouseDblClicked(self)

2.2 修改 ChartWidget使之可以具有处理鼠标双击事件

为class ChartWidget添加下面的鼠标双击的下面的回调函数。需要说明的是这里有个约定:容纳主图的PlotItem名称必须为"candle",其他的名称认为是副图,无需纠结是不是太过特别订制了。

    def on_mouseDblClicked(self,plot:pg.PlotItem):
        """ 
        所有内含图表被双击的回调函数。 hxxjava add 
        """
        if plot.name == "candle":
            # 选择所有副图
            others = [pi for pi in self._plots.values() if pi.name != plot.name]

        else:
            # 选择其他副图
            others = [pi for pi in self._plots.values() if pi.name not in [plot.name,'candle']]

            # 求当前幅图的高度变化量
            delta_h = 0
            for pi in others:
                delta_h += pi.height()*(1 if pi.isVisible() else -1)
            plot.setFixedHeight(plot.height()+delta_h)

        # 隐藏/显示未变双击的图表
        for pi in others:
            pi.setVisible(not pi.isVisible())

修改class ChartWidget的add_plot()函数,代码如下:

    def add_plot(
        self,
        plot_name: str,
        minimum_height: int = 80,
        maximum_height: int = None,
        hide_x_axis: bool = False
    ) -> None:
        """
        Add plot area.
        """
        # Create plot object
        # plot: pg.PlotItem = pg.PlotItem(axisItems={'bottom': self._get_new_x_axis()})
        plot: pg.PlotItem = MyPlotItem(axisItems={'bottom': self._get_new_x_axis()},name=plot_name,on_mouseDblClicked=self.on_mouseDblClicked)
        plot.setMenuEnabled(False)
        plot.setClipToView(True)
        plot.hideAxis('left')
        plot.showAxis('right')
        plot.setDownsampling(mode='peak')
        plot.setRange(xRange=(0, 1), yRange=(0, 1))
        plot.hideButtons()
        plot.setMinimumHeight(minimum_height)

        if maximum_height:
            plot.setMaximumHeight(maximum_height)

        if hide_x_axis:
            plot.hideAxis("bottom")

        if not self._first_plot:
            self._first_plot = plot

        # Connect view change signal to update y range function
        view: pg.ViewBox = plot.getViewBox()
        view.sigXRangeChanged.connect(self._update_y_range)
        view.setMouseEnabled(x=True, y=True)   # hxxjava change,old---view.setMouseEnabled(x=True, y=False)

        # Set right axis
        right_axis: pg.AxisItem = plot.getAxis('right')
        right_axis.setWidth(60)
        right_axis.tickFont = NORMAL_FONT

        # Connect x-axis link
        if self._plots:
            first_plot: pg.PlotItem = list(self._plots.values())[0]
            plot.setXLink(first_plot)

        # Store plot object in dict
        self._plots[plot_name] = plot

        # Add plot onto the layout
        self._layout.nextRow()
        self._layout.addItem(plot)

3. 实现的效果

完成了第2部分的代码后,您就可以使用ChartWidget像以往一样创建你的K线图表了。

3.1 一个较为复杂的、完整的K线图表窗口

description

3.2 双击主图后的K线图表窗口

description

3.3 双击其中一个副图后的K线图表窗口

双击其中的MACD副图后,K线图表窗口MACD副图会占据其他副图的显示区域,再次双击该MACD副图,K线图表窗口中的其他副图就再次显示出来了。
description
双击其中的的delta副图后,K线图表窗口delta副图会占据其他副图的显示区域,再次双击该delta副图,K线图表窗口中的其他副图就再次显示出来了。
description

是不是觉得这样显示会给您看图带来一些希望的好处!!!

1. 问题的由来

实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:

  • 10:10~10:15的那根5分钟K线在10:15:00到10:30:00的休市时间段内是出不来的;
  • 11:25:00到11:30:00的那根5分钟K线在11:30:00到13:30:00的休市时间段内是出不来的;
  • 还有日K线,已经到15:00:00收盘了,可是因为没有收到下一秒tick而不能够及时生成出来。如果您不关机,等到下一个交易日的第一个tick收到的时候才能生成这根日K线。
  • 另外如果你是个细心的人,注意下策略在加载历史数据的时候,最后一根日K线、30分钟、5分钟K线乃至最后一根1分钟K线也是见不到的。

目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!

为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。

2. 合约交易状态可以解决这个问题。

交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。

3. 实现这个问题一共分成5步

套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:

  • 第一步扩展CTA策略引擎CtaEngine
  • 第二步修改CTA策略模板CtaTemplate
  • 第三步扩展K线生成器BarGenerator
  • 第四步修改CTA策略
  • 第五步启用扩展的MyCtaEngine

3.1 第一步扩展CTA策略引擎CtaEngine

在vnpy_ctastrategy\engine.py中增加下面的内容:

class MyCtaEngine(CtaEngine):
    """ 
    CTA策略引擎,对CtaEngine的功能进行扩展。 
    功能:
    1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
    2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
    3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
    """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
        self.event_engine.register(EVENT_STATUS, self.process_status_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """

        tick:TickData = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_status_event(self,event:Event):
        """ 交易状态消息处理 hxxjava add """
        status:StatusData = event.data
        strategies:List[CtaTemplate] = []

        # step1: find strategies related to this status data 
        vt_instrument0 = get_vt_instrument(status.vt_symbol)
        if vt_instrument0 == status.vt_symbol:
            # 交易品种的交易状态
            for vt_symbol in self.symbol_strategy_map.keys():
                vt_instrument = get_vt_instrument(vt_symbol)
                if vt_instrument == vt_instrument0:
                    # 交易品种的交易状态属于策略交易的合约
                    strategies.extend(self.symbol_strategy_map[vt_symbol]) 

        else:
            # 单独合约的交易状态
            strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))

        if not strategies:
            return

        # step 2: push status data to all relate strategies
        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_status, status)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

3.2 第二步修改CTA策略模板CtaTemplate

在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:


class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""
        self.cta_engine: Any = cta_engine
        self.strategy_name: str = strategy_name
        self.vt_symbol: str = vt_symbol

        self.inited: bool = False
        self.trading: bool = False
        self.pos: int = 0

        # Copy a new variables list here to avoid duplicate insert when multiple
        # strategy instances are created with the same strategy class.
        self.variables = copy(self.variables)
        self.variables.insert(0, "inited")
        self.variables.insert(1, "trading")
        self.variables.insert(2, "pos")

        self.update_setting(setting)

    def update_setting(self, setting: dict) -> None:
        """
        Update strategy parameter wtih value in setting dict.
        """
        for name in self.parameters:
            if name in setting:
                setattr(self, name, setting[name])

    @classmethod
    def get_class_parameters(cls) -> dict:
        """
        Get default parameters dict of strategy class.
        """
        class_parameters: dict = {}
        for name in cls.parameters:
            class_parameters[name] = getattr(cls, name)
        return class_parameters

    def get_parameters(self) -> dict:
        """
        Get strategy parameters dict.
        """
        strategy_parameters: dict = {}
        for name in self.parameters:
            strategy_parameters[name] = getattr(self, name)
        return strategy_parameters

    def get_variables(self) -> dict:
        """
        Get strategy variables dict.
        """
        strategy_variables: dict = {}
        for name in self.variables:
            strategy_variables[name] = getattr(self, name)
        return strategy_variables

    def get_data(self) -> dict:
        """
        Get strategy data.
        """
        strategy_data: dict = {
            "strategy_name": self.strategy_name,
            "vt_symbol": self.vt_symbol,
            "class_name": self.__class__.__name__,
            "author": self.author,
            "parameters": self.get_parameters(),
            "variables": self.get_variables(),
        }
        return strategy_data

    @virtual
    def on_init(self) -> None:
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_inited(self):    # hxxjava add
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_start(self):
        """
        Callback when strategy is started.
        """
        pass

    @virtual
    def on_stop(self) -> None:
        """
        Callback when strategy is stopped.
        """
        pass

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

    @virtual
    def on_status(self, status: StatusData=None):
        """
        Callback of new status data update.   # hxxjava add for trading status
        """
        pass

    @virtual
    def on_tick(self, tick: TickData) -> None:
        """
        Callback of new tick data update.
        """
        pass

    @virtual
    def on_bar(self, bar: BarData) -> None:
        """
        Callback of new bar data update.
        """
        pass

    @virtual
    def on_trade(self, trade: TradeData) -> None:
        """
        Callback of new trade data update.
        """
        pass

    @virtual
    def on_order(self, order: OrderData) -> None:
        """
        Callback of new order data update.
        """
        pass

    @virtual
    def on_stop_order(self, stop_order: StopOrder) -> None:
        """
        Callback of stop order update.
        """
        pass

    def buy(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send buy order to open a long position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def sell(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send sell order to close a long position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def short(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send short order to open as short position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def cover(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send cover order to close a short position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids: list = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock, net
            )
            return vt_orderids
        else:
            return []

    def cancel_order(self, vt_orderid: str) -> None:
        """
        Cancel an existing order.
        """
        if self.trading:
            self.cta_engine.cancel_order(self, vt_orderid)

    def cancel_all(self) -> None:
        """
        Cancel all orders sent by strategy.
        """
        if self.trading:
            self.cta_engine.cancel_all(self)

    def write_log(self, msg: str) -> None:
        """
        Write a log message.
        """
        self.cta_engine.write_log(msg, self)

    def get_engine_type(self) -> EngineType:
        """
        Return whether the cta_engine is backtesting or live trading.
        """
        return self.cta_engine.get_engine_type()

    def get_pricetick(self) -> float:
        """
        Return pricetick data of trading contract.
        """
        return self.cta_engine.get_pricetick(self)

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ) -> None:
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback: Callable = self.on_bar

        bars: List[BarData] = self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

        for bar in bars:
            callback(bar)

    def load_tick(self, days: int) -> None:
        """
        Load historical tick data for initializing strategy.
        """
        ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)

        for tick in ticks:
            self.on_tick(tick)

    def put_event(self) -> None:
        """
        Put an strategy data event for ui update.
        """
        if self.inited:
            self.cta_engine.put_strategy_event(self)

    def send_email(self, msg) -> None:
        """
        Send email to default receiver.
        """
        if self.inited:
            self.cta_engine.send_email(msg, self)

    def sync_data(self) -> None:
        """
        Sync strategy variables value into disk storage.
        """
        if self.trading:
            self.cta_engine.sync_strategy_data(self)

    def get_trading_hours(self):
        """
        Return trading_hours of trading hours.      # hxxjava add
        """
        return self.cta_engine.get_trading_hours(self)

    def get_actual_days(self,last_time:datetime,days:int):  # hxxjava add
        """
        得到从last_time开始往前days天的实际天数。
        """
        if days <= 0:
            return 0

        th = TradingHours(self.get_trading_hours())

        # 找到有效的最后时间
        till_time = last_time
        while not th.get_trade_hours(till_time):
            till_time = timedelta(minutes=1)

        availble_days = 0
        #找到有些多开始时间
        from_time = till_time
        while availble_days <= days:
            from_time -= timedelta(days=1)
            if th.get_trade_hours(from_time):
                availble_days += 1

        actual_days = (last_time-from_time).days
        # print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
        return actual_days

    def get_contract(self):
        """
        Return trading_hours of trading contract.   # hxxjava add
        """
        return self.cta_engine.get_contract(self)

    @virtual
    def on_condition_order(self, cond_order: ConditionOrder):
        """
        Callback of condition order update.
        """
        pass

    def send_condition_order(self,order:ConditionOrder):    # hxxjava add
        """ """
        if not self.trading:
            return False
        return self.cta_engine.send_condition_order(order)

    def cancel_condition_order(self,cond_orderid:str):      # hxxjava add
        """ """
        return self.cta_engine.cancel_condition_order(cond_orderid)

    def cancel_all_condition_orders(self):                  # hxxjava add
        """ """
        return self.cta_engine.cancel_all_condition_orders(self)

    def send_margin_ratio_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = MarginRequest(symbol=symbol,exchange=exchange)
        main_engine.send_margin_ratio_request(req,contract.gateway_name)

    def send_commission_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = CommissionRequest(symbol=symbol,exchange=exchange)
        main_engine.send_commission_request(req,contract.gateway_name)

3.3 第三步修改K线生成器BarGenerator

修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:

class BarGenerator:
    ... ...
    def update_status(self,status:StatusData=None):
        """  """
        # if status:
        #     hh,mm = status.enter_time.split(':')
        #     st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)

        if self.bar:
            # 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
            self.on_bar(self.bar)
            self.bar = None

3.4 第四步修改Cta策略

在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:

class XxxStrategy(CtaTemplate):
    """ XXX交易策略 """
    self.window = 30

    ... ...

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
        ... ...
        self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
        ... ...

    def on_inited(self):
         """ 
        Callback after strategy is inited. 
        """
        self.bg.update_status()   # 用来强制生成最后的1分钟K线
        self.write_log("策略初始化结束。")

    def on_auction_tick(self, tick: TickData):
        """
        Callback of auction tick data update.
        """
        self.bg.update_auction_tick(tick)

    def on_status(self,status:StatusData=None):  # 
        """ 
        收到合约交易状态信息时,更新所有的K线生成器 。
        注意:合约交易状态信息推送只会在策略初始化后才执行!
        """
        self.bg.update_status(status=status)

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.bg.update_bar(bar)

    def on_xmin_bar(self, bar: BarData):
        """  """
        # 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
        pass

    ... ...

3.5 第五步启用扩展的MyCtaEngine

修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

在ChartWidget基础上创建的指标是不能改变主图或副图的参数的,如何做才能给让指标传递不同参数?
以MacdItem为例,步骤如下:

1. MacdItem是一个有参数的显示控件

MacdItem的实现见:为K线图表添砖加瓦——解决MACD绘图部件显示范围BUG,这里就不再贴代码。

2. 修改ChartWidget的add_item()方法,使得它可以接受指标参数

    def add_item(
        self,
        item_class: Type[ChartItem],
        item_name: str,
        plot_name: str,     
        **kwargs     # hxxjava add
    ) -> None:
        """
        Add chart item.
        """
        # item: ChartItem = item_class(self._manager) 
        item: ChartItem = item_class(self._manager) if not kwargs else item_class(self._manager,**kwargs)   # hxxjava change

        self._items[item_name] = item

        plot: pg.PlotItem = self._plots.get(plot_name)
        plot.addItem(item)

        self._item_plot_map[item] = plot

3. MyChartWidget是一个包含ChartWidget的窗口

class MyChartWidget(QtWidgets.QFrame):
    """
    订单流K线图表控件,
    """
    def __init__(self,title:str="K线图表"):
        """"""
        super().__init__()
        self.init_ui()
        self.setWindowTitle(title)

    def init_ui(self):
        """"""
        self.resize(1400, 800)

        # Create chart widget
        self.chart = ChartWidget()
        self.chart.add_plot("candle", hide_x_axis=True)
        self.chart.add_plot("fast_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("slow_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("volume", maximum_height=100,hide_x_axis=False)

        self.chart.add_item(CandleItem, "candle", "candle")  
        self.chart.add_item(MacdItem, "fast_macd", "fast_macd",short_window=6,long_window=19,M=9)   # 相当于MACD(6,19,9)
        self.chart.add_item(MacdItem, "slow_macd", "slow_macd",short_window=19,long_window=39,M=9)  # 相当于MACD(19,39,9)
        self.chart.add_item(VolumeItem, "volume", "volume")

        self.chart.add_cursor()

最近经常有朋友问MacdItem绘图部件显示范围有问题,因为老是有任务在手上,一直没有时间修改,现修改过了,测试没有问题,代码如下:

class MacdItem(ChartItem):
    """"""
    def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
        self.last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

        self.short_window = short_window
        self.long_window = long_window
        self.M = M

        self.macd_data: Dict[int, List[float,float,float]] = {}


    def get_macd_value(self, ix: int) -> List:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = [diffs[n],deas[n],macds[n]]

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = [diff,dea,macd]

        return [diff,dea,macd]

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """ 
        获得3个指标在y轴方向的范围 
        hxxjava 修改,2022-11-15
        当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
        """  
        if not self.macd_data:
            # 如果MACD三个数据没有计算过
            return (-100,100)

        last_range = (min_ix,max_ix)
        if last_range == (None,None):   
            # 本次索引范围未改变           
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存,读取y方向范围
                result = self._values_ranges[self.last_range]
                return adjust_range(result)

            else:
                # 如果y方向范围没有保存,从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)  

                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                if result == (np.nan,np.nan):
                    # 前面的未计算出有效值会出现这种情况
                    return (-100,100)

                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                return adjust_range(result)

        """ 以下为显示范围变化时 """
        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            self.last_range = last_range
            result = self._values_ranges[last_range]
            return adjust_range(result)

        # 该范围没有保存过y方向范围,从macd_data重新计算y方向范围
        min_ix = max(0,min_ix)
        max_ix = min(max_ix,len(self.macd_data)-1)

        macd_list = list(self.macd_data.values())[min_ix:max_ix+1]
        ndarray = np.array(macd_list) 

        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)

        if result == (np.nan,np.nan):
            # 前面的未计算出有效值会出现这种情况
            return (-100,100)

        # print(f"result1={result}")
        self.last_range = (min_ix,max_ix)
        self._values_ranges[self.last_range] = result

        self.min_ix,self.max_ix = last_range
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        """ """
        barscount = len(self._manager._bars) # hxxjava debug
        if ix in self.macd_data:
            [diff,dea,macd] = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}",
                f"barscount={ix,barscount}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text

duke wrote:

谢谢大神,问题解决了,另外发现一个BUG,if bar.datetime< self.exit_time: 应该改为if bar.datetime.time() < self.exit_time:

你的修改是错误的,bar.datetime.time()的类型是time类型,self.exit_time的类型也是datetime。

一、问题是怎么发现的

本人在对郑商所行情数据中时间秒以下的部分进行补充的修改一文中,给出来对郑商所行情数据中时间秒以下的部分进行了补充,ctp_gateway也已经按照预想的那样工作了。之后启动DataRecorder对多个合约进行tick数据的录制。可是在价差tick数据的时候却发现对有夜盘的合约进行录制的时间,如果在23:00以后还有tick数据推送的话,会出现日期错误,具体的表现是:2022-10-19 23:00:00以后的tick数据中的datetime字段的日期部分居然是2022-10-20!也就是说我昨天晚上就录到了今天晚上23:00:00以后tick数据(当然是未来时间),这是错误的!

二、证据

打开mysql数据库(我用的是mysql数据库):

输入下面的SQL查询语句:

select symbol,datetime from dbtickdata  where time(datetime) > "22:59:59.5";

得到的结果:

+--------+-------------------------+
| symbol | datetime                |
+--------+-------------------------+
| i2301  | 2022-10-19 22:59:59.522 |
| i2301  | 2022-10-19 23:00:00.013 |
| i2301  | 2022-10-19 23:00:00.038 |
| i2301  | 2022-10-20 23:00:00.038 | ---- 日期错误,应该是2022-10-19 
| p2301  | 2022-10-19 23:00:00.025 |
| p2301  | 2022-10-20 23:00:00.025 | ---- 日期错误,应该是2022-10-19 
| rb2301 | 2022-10-19 22:59:59.500 |
| rb2301 | 2022-10-19 23:00:00.000 |
| TA301  | 2022-10-19 22:59:59.500 |
| TA301  | 2022-10-19 22:59:59.750 |
| TA301  | 2022-10-19 22:59:59.875 |
+--------+-------------------------+

注意:我发现此问题的时间是2022-10-20 16:00!

三、原因找到了!

经过仔细研读ctp_gateway.py的CtpMdApi的onRtnDepthMarketData(),具体的实现见下面的代码,发现对大商所合约tick的datetime错误导致的。

3.1 大商所合约的深度行情中没有日期字段

按照ctp接口规范文档知道,大商所合约的深度行情中没有日期字段,需要客户端收到后自行用本地日期去替代。
CtpMdApi中用定时器维护了一个self.current_date的本地日期成员,其主要作用就是做为了补齐大商所合约的深度行情日期和时间之用的。

3.2 何时会出现这个错误?

因为CtpMdApi每次重新连接之后,再次订阅一些合约的行情的时候,必然会推送一条该合约在交易所中最后更新的tick时间给订阅方。只要在每天8:59(不含)之前重新连接大商所的行情服务器,就一定会收到一个日期为当日,时间为该合约最后一次推送的tick,而其中的补足日期是不对的!

请看看上面的证据中,发生日期错误的全部是大商所的合约,分析正确 !

   def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

3.3 怎么解决?

方法1. 问题已经找到,怎么保证这个日期一定是正确的,可不是一件简单的事情,因为它牵涉到交易时间段和节假日的计算问题!
方法2. 有一个简单的办法,既然正确日期不好计算,可以对大商所的tick与当前本地时间进行比较,如果tick的datetime是未来时间,直接该tick抛弃掉,这样也不会有太大影响。

3.4 采用方法2的实现方法

打开vnpy_ctpgateway.py文件,对class CtpMdApi进行如下修改,具体的修改请查找 '# hxxjava'就可以找到修改的语句了。
1、CtpMdApi中增加self.current_time,用来记录本地时间

class CtpMdApi(MdApi):
    """"""

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: Set = set()  

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        # self.current_date: str = datetime.now().strftime("%Y%m%d")    # hxxjava comments
        self.current_time: datetime = datetime.now()                    # hxxjava adds
        self.current_date: str = self.current_time.strftime("%Y%m%d")   # hxxjava adds

        self.czce_last_times:Dict[str,datetime] = {}    # hxxjava add

2、修改CtpMdApi的update_date()函数

    def update_date(self) -> None:
        """更新当前日期"""
        # self.current_date = datetime.now().strftime("%Y%m%d") # hxxjava comments

        self.current_time = datetime.now()                      # hxxjava adds
        self.current_date = self.current_time.strftime("%Y%m%d")   # hxxjava adds

3、修改CtpMdApi的深度行情推送函数onRtnDepthMarketData()

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt

        elif contract.exchange == Exchange.DCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            if dt > CHINA_TZ.localize(self.current_time) + timedelta(seconds = 10):
                # 如果大商所的补足本地日期的dt超前每2秒更新一次的本地时间2秒,判断为无数的行情数据,做丢弃处理。
                # 但是为了降低对本地时间与交易所时间同步要求,放宽为10秒或者更多都可以
                return
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

七月雪 wrote:

Teller wrote:

description
我研究了一下,这两段代码好像是跟时区有关的,先全都注了就能用。
但不知道后续会有什么影响,坐等大佬答疑。
红框圈的是毫秒,不是时区。接口每0.5秒推一次数据

不是0.5秒,而是秒内收到tick,第1个算整秒,第2个算0.5秒,第3个算0.75秒,一次类推... ...。本意是为了上层应用使用这些tick时的方便。

Teller wrote:

description
感谢大佬回复!
大佬,修改后我遇到这样的报错,是不是我少import了什么包?
我自己尝试import了pytz包,但是没成功。

答复:
ctp_gateway.py的引用部分有

import pytz

下面是常量定义:

CHINA_TZ = pytz.timezone("Asia/Shanghai")       # 中国时区

然后就可以使用了,目的是把CTP接口中的所有行情、委托单和成绩单这涉及到时间的字段都同一到中国时区。
难道你的ctp_gateway.py文件中没有这些内容?

Teller wrote:

description
感谢大佬分享!
我想按您说的修改一下代码,但发现有一行代码不同,用原代码还是用您的代码呀,还是说两行代码是一个意思。
我学python没多久,很多地方看不懂,在这里跪谢大佬。

答复:
去掉前面的,保留后一句。

守望长城2020-6-11-艾瑞巴蒂 wrote:

大佬, 还有一个问题, K线图运行久了会报错, 我绘制的是Tick 图, 报错如下, 不知道咋解决
description

看了你的vnpy的目录构成,貌似您用的版本还是很久以前的老版本了,至少不是最新版本。

"H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\backtesting.py"

如果是新的版本,应该是这样的:

"H:\vnstudio\lib\site-packages\vnpy_ctastrategy\backtesting.py"

由此推断你的pyqtgraph的版本也是老的版本。
随着vnpy的版本升级,pyqtgraph的版本也会采用新的版本,你升级到3.0及以后版本再试试看。

守望长城2020-6-11-艾瑞巴蒂 wrote:

wangjiancc wrote:

H:\vnstudio\python.exe H:/vnstudio/Lib/site-packages/vnpy/app/ctastrategy/backtesting/run.py
2021-03-03 22:22:48,092 INFO: 注册日志事件监听
Exception in thread Thread-4:
Traceback (most recent call last):
File "H:\vnstudio\lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "H:\vnstudio\lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "H:\vnstudio\Lib\site-packages\vnpy\app\cta_backtester\engine.py", line 168, in run_backtesting
setting
File "H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\backtesting.py", line 211, in add_strategy
self, strategy_class.name, self.vt_symbol, setting
File "H:\vnstudio\lib\site-packages\vnpy\app\cta_strategy\strategies\KxMonitor.py", line 82, in init
self.even_engine = cta_engine.main_engine.event_engine
AttributeError: 'BacktestingEngine' object has no attribute 'main_engine'

Process finished with exit code 0

大佬,请问下这是什么错误?
看报错就是你没有传main_engine进去啊

策略中可以考虑回测的情况,另外回测是不允许显示K线图的,如果非要这么干,在策略中要判断是否运行在实盘或者回测环境,这可以通过获得engine的类型是否为

if self.engine.engine_type == EngineType.BACKTESTING:
   xxxxxx  放弃窗口K线图表
else:
   xxxxxx  创建窗口K线图表

的方法来进行规避错误的发生。

1. 问题的由来

在使用vnpy中的DataRecorder进行tick情录制的时候,本人发现郑商所的tick中时间在一秒内会收到两个tick,但是同一秒内的tick的时间是相同的。这会给后端的CTA策略、价差交易策略登录模块在合成K的时带来不必要的困扰,因为通常合成K线的BarGenerator通常对tick的时间进行有效性判断的时候,会把时间重复的tick做抛弃处理,这会再次郑商所的品种的tick被使用的量减半。如果重复的tick也用的话,这样倒是不会减半,可是在防止垃圾数据时又无法杜绝旧的数据在网关断网或者重新再次连接时,接口回再次推送最后的tick数据,而这个tick数据之前已经被实时推送给客户段了。所有在ctp_gateway中对郑商所行情数据中时间秒以下的部分进行补充是非常必要的!

2. 修改思路

  1. 在ctp_gateway中对郑商所tick行情数据进行特别处理,思路是每次接收到郑商所tick时t1,与上次的tick的时间t2进行比较,如果二者是同一秒的tick,那么将新的tick时间t1 = t2 +(t2到下一秒微秒数量÷2);如果t1,t2不是同一秒或t2不存在,这t1无需调整。
  2. 记录该t1时间到字典中,供下一次使用

3. 修改代码

本修改只需要对vnpy_ctp\ctp_gateway.py进行修改就可以了,修改步骤如下:

3.1 在CtpMdApi的init()增加郑商所最新tick时间字典:

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: Set = set()  

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = datetime.now().strftime("%Y%m%d")

        self.czce_last_times:Dict[str,datetime] = {}    # 郑商所最新tick时间字典 hxxjava add

3.2 修改CtpMdApi行情推送函数onRtnDepthMarketData():

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

4. 问题解决了

这是本人录制的郑商所合约TA301的结果:

description

description
都说国内期货的行情推送最多是2次,其实是错误的,可能超过2次 !看上图就知道,一秒3次一个经常存在的,所以不能够固定递增0.5秒,而应该向本文中的办法,每次加0.5,0.25,0.125......秒的比较好,既不重复有可以超过2次。

至此就完美地解决了郑商所合约行情秒内tick时间重复的问题了!

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】