VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

公告:SimNow系统切换更新通告

  1. SIMNOW将于下周一(5月17日)夜盘上线一套新的环境,该环境可以进行全市场全部期货合约和上期所全部期权合约的交易。同时,老环境将关闭。在新环境内将沿用之前注册的账号密码,重置资金和持仓,初始资金为2000万。

  2. 新环境采用新的前置地址,如下:

第一组:Trade Front:180.168.146.187:10201,Market Front:180.168.146.187:10211;【电信】(看穿式前置,使用监控中心生产秘钥)

第二组:Trade Front:180.168.146.187:10202,Market Front:180.168.146.187:10212;【电信】(看穿式前置,使用监控中心生产秘钥)

第三组:Trade Front:218.202.237.33:10203, Market Front:218.202.237.33:10213; 【移动】(看穿式前置,使用监控中心生产秘钥)

  1. 新的交易终端目前只提供快期,后续会增加更多终端软件,SIMNOW官网也将在同一时间更新,请在网站更新后到“终端下载”进$下载。simnow官网将更新可下载到的API,版本为v6.5.1_20200908。

  2. 7*24环境将于该新环境上线后两周上线。

  3. 网站更新后,入金逻辑不变,每日仍然可以入金三次,但是增加单次入金上限,提高至单次200万。

升级过程中给广大SimNow用户造成的不便,深表歉意!

发布日期:2021/05/13

1. 启动投资组合管理应用的界面

description

2. 投资组合管理应用不应该是可选启动项

设想一下今天用户的启动情况:

  • 20:55-23:35 启动了投资组合管理应用
  • 8:55-11:35 没有启动投资组合管理应用
  • 13:25-15:35 又启动了投资组合管理应用

9:00-11:30 期间进行的各个应用进行的操作就无法被PortfolioManager截获,当然也就无法被记录到下面的两个json文件中:

    portfolio_manager_data.json
    portfolio_manager_order.json

当我们再次打开投资组合管理应用的时,看到的组合列表和各个组合列表的成交记录就可能是有遗漏的!
那么这样的成交记录查询结果就会给用户有种莫名其妙的感觉。

因此,投资组合管理应用应该是自动启动应用。

3. 投资组合管理应用缺失的功能

  1. 只能显示当前交易日的成交记录,无法显示历史交易记录,也没有盈亏统计。这里显示的交易记录是调用的self.main_engine.get_all_trades()——不包含历史成交记录。
  2. 没有当前交易日的委托记录查询,也没有历史交易日的委托记录查询。
  3. 按照当前设计,取名“投资组合管理”有点名不副实,应该叫“成交记录查询”更合适些。
  4. 既然叫“投资组合管理”,那么应该可以有可以干预的功能在里面,比如:
    • 如果手动平掉的是策略自动开仓的仓位,可以在这里把它们关连起来。
    • 允许用户对记录进行删减或者修改,以便与当前系统各个应用的实际运行情况一致,而不是打开一个个json文件进行手工修改,那样对用户的要求高不说,也容易出错。

VNPY如何连接UFX对接恒生云接口?且随我一步一步往下做。

1. 注册恒生云期货模拟

在浏览器中输入http://ufx.hs.net,进入如下界面:
description

点击注册,进入恒生云测试平台的用户注册,①输入手机号,③密码,④密码确认和②验证码,勾选下面的同意,就可以注册恒生云测试平台的用户了。

description

2. 查询模拟期货账户及密码

完成了第1步之后,你已经有一个恒生云测试平台的用户,再次在浏览器中输入http://ufx.hs.net,输入您的手机号和密码,就可以登录进入下面的界面:
description
点击图中的申请期货、证券或者股票期权下的按钮,就可以申请你想要的模拟市场,其中不同模拟市场的账户或者密码可能不一样。如下图所示。
记住这里测试账号和密码,这是我们vnpy要登录的UFT网关中的用户名与密码!
点击图中的④接口下载,进入下一步。

3. 下载接口认证文件lisence.dat及行情和交易服务器地址

description
点击1下载,下载一个包含认证文件lisence.dat的rar文件,同时记录②站点ip和端口号,这是vnpy要登录的UFT网关中行情和交易服务器的ip和端口号。
记录④,⑤,分别为直连模式下的接入方ID和授权码,这是vnpy要登录的UFT网关中产品名称和授权码。

4. 登录vnpy

启动vnpy时,在配置VN Trader界面中,如图所示,勾选恒生UFT,记住图中的②运行目录,一定把步骤3中下载的认证文件lisence.dat复制到该目录。然后点击③启动VN Trader。
description

5. 连接UFX恒生云模拟接口

进入VN Trader主界面后,选择系统>连接UFT,进入如下的UFT连接参数设置。
其中①-⑦各项在第2步和第3步中都已经获得,只是注意图中的⑧,委托类型必须填7,这是恒生云客服告诉我的!
然后点击连接按钮就可以成功登录UFX恒生云模拟账户了。
description

6. 订阅行情

下图中显示的是成功登录UFX恒生云模拟账户后,可以看到①接口登录日志、②资金账户都已经显示正确信息。
接下来你可以在③交易模块中的输入交易所和代码,回车后就可以看到④行情模块中已经成功订阅了MA109.CZCE和rb2110.SHFE的行情了。
到此VNPY成功连接UFX对接恒生云模拟接口!
description

7. 交易测试

成功连接恒生云模拟接口,免不了要测试下能否交易,——结果见下图:

description

8. 这哪里是模拟接口,其实是个测试接口!

连接恒生云UFX,目前还问题多多!

  1. 经常连不上
  2. 连上了,报授权码无效或者过期
  3. 休市期间恒生云接口会重演行情数据,这个最让人不能理解!
  4. 休市期间重演行情数据的时候,还可以模拟交易!
  5. 感觉恒生云模拟接口的定位是测试而非模拟,他们的目标是为了调试程序接口是否正常,而不是为用户的交易策略的运行提供一个贴近实盘的交易环境吧,重演就说明了这个问题。

8.1 去掉休市行情重演和休市交易功能

目前,经过和恒生云急速API客服团队的沟通,要求他们把恒生云去掉休市行情重演和休市可以交易功能,意见已经被采纳了。
他们打算提供提两套接口,一个是模拟环境接口(无休市行情重演和交易),一个是测试接口(有休市行情重演和交易)。

8.2 目前已经和CTP一样,是一个没有休市重演和休市交易了

恒生云的还是快速做出调整,目前模拟接口已经没有了休市行情重演和休市交易功能,给一个大大的赞!

9. CTP接口又可以使用了!

鉴于恒生云接口目前问题不少,同时CTP模拟接口又可以用了,不愿意折腾恒生云接口的小伙伴们又可以重新使用CTP了。
只是需要重新修改下新的行情和交易服务端口号就可以了,沿用之前的账户和密码,不过以前的资金回复为2000万,同时以前交易记录也都清零了。
具体设置见 公告:SimNow系统切换更新通告

无论如何,小伙伴们现在已经可以有三个解决模拟的接口:1、CTP模拟交易;2、恒生模拟交易;3、PaperAccount。选择哪一个?自己决定吧!

10. 恒生UFT网关成交单查询应答错误的解决

如果细心的话,您也许会发现[7. 交易测试】的图中没有成交单,这是因为UFT网关成交单查询应答出错误了!不过本人经找把问题已到并且解决了。
详细见:恒生UFT网关成交单查询应答错误

1. 升级了,MACD不正常了!

很久没有升级vnpy了,前几天升级了,K线图表MACD,为什么?
差了下,MacdItem和ChartWighet,都没有修改,为什么不正常了呢?原来是pyqtgraph升级了,研究了半天也是一头雾水。

2. 修改了可以了,代码如下:

class MacdItem(ChartItem):
    """"""
    _values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

    last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self.short_window = 12
        self.long_window = 26
        self.M = 9

        self.macd_data: Dict[int, Tuple[float,float,float]] = {}

    def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
        """"""

        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = (diffs[n],deas[n],macds[n])

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = (diff,dea,macd)

        return (diff,dea,macd)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        #   获得3个指标在y轴方向的范围   
        #   hxxjava 修改,2020-6-29
        #   当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,

        offset = max(self.short_window,self.long_window) + self.M - 1

        if not self.macd_data or len(self.macd_data) < offset:
            # print(f'(min_ix,max_ix){(min_ix,max_ix)} offset={offset},len(self.macd_data)={len(self.macd_data)}')
            # hxxjava 修改,2021-5-8,因为升级vnpy,其依赖的pyqtgraph版本也升级了,原来为return 0,1
            return -100, 100

        # print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))

        if min_ix != None:          # 调整最小K线索引
            min_ix = max(min_ix,offset)

        if max_ix != None:          # 调整最大K线索引
            max_ix = min(max_ix, len(self.macd_data)-1)

        last_range = (min_ix,max_ix)    # 请求的最新范围   

        if last_range == (None,None):   # 当显示范围不变时
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存
                # 读取y方向范围
                result = self._values_ranges[self.last_range]
                # print("1:",self.last_range,result)
                return adjust_range(result)
            else:
                # 如果y方向范围没有保存
                # 从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)           
                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                # print("2:",self.last_range,result)
                return adjust_range(result)

        """ 以下为显示范围变化时 """

        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            result = self._values_ranges[last_range]
            # print("3:",last_range,result)
            return adjust_range(result)

        # 该范围没有保存过y方向范围
        # 从macd_data重新计算y方向范围
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list) 
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)

        self.last_range = last_range
        self._values_ranges[self.last_range] = result
        # print("4:",self.last_range,result)

        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        # """"""
        if ix in self.macd_data:
            diff,dea,macd = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text

3. 修改后显示效果

description

期货集合竞价时间期货集合竞价规则
大多数投资者每天的交易都是从集合竞价开始的,并且很多投资者还会参与到其中来,那么期货集合竞价时间和集合竞价规则到是什么呢?

一、什么是期货的集合竞价

集合竞价指在每个期货交易日开市前的规定时间里,由期货交易者按照自己所能接受的价格进行买卖申报。

description

二、期货集合竞价的时间和规则

每一交易日开市前5分钟内,前4分钟为投资者对期货合约买、卖价格指令申报时间,后1分钟为集合竞价撮合时间,集合竞价产生的价格即为开盘价,随即在行情栏中显示。
这里需要注意: .
①集合竞价只能使用限价指令申报,不能使用市价指令。
②日盘品种(指无夜盘的品种)的集合竞价时间是8:55-8:59.夜盘品种的集合竞价时间是20:55-20:59.有夜盘的品种日盘不再进行集合竞价。
③集合竞价期间的申报价格盘面是不显示的,只能根据自己的预判进行申报,申报价格范围为上一交易日结算价的涨跌停板价。

description

                                                股指期货时间轴

description

                                                 国债期货时间轴

三、集合竞价产生的原理

国内期货交易所均采用计算机撮合成交方式,指交易所的计算机交易系统对交易双方的交易指令进行配对的过程,按价格优先、时间优先的原则进行配对,以此价格成交能够
得到最大成交量。
首先,交易系统分别对所有有效的买入申报按申报价由高到低的顺序排列,申报价相同的按照进入系统的时间先后排列;所有有效的卖出申报按申报价由低到高的顺序排列,申报价相同的按照进入系统的时间先后排列。
接下来,交易系统依此逐步将排在前面的买入申报和卖出申报配对成交,直到不能成交为止。

四、集合竞价举例详解

假定,某合约申报排序如下表所示(最小变动价为1元)。

description

集合竞价申报顺序表
配对情况为:
①排序为1的买进价高于排序为1的卖出价,成交30手;
②排序为1的买进价还有20手没成交,由于比排序为2的卖出价高,继续成交20手;
③排序为2的卖出价还有40手没成交,由于比排序为2的买进价低,继续成交40手;
④排序为2的买进价还有50手没成交,由于比排序为3的卖出价高,成交50手;
⑤排序为3的卖出价还有70手没成交,由于比排序为3的买进价高,显然无法成交。
这样,最终的开盘价就是2588元,在此价位上成交140手(30+20+40+50=140.如按双边计算则是280手)。
申报指令的撮合成交原理为:
①高于集合竞价产生的价格的买入申报全部成交;
②低于集合竞价产生的价格的卖出申报全部成交;
③等于集合竞价产生的价格的买入或卖出申报,根据买入申报量和卖出申报量的多少,按少的一方的申报量成交。
拿玉米C1901来举例:

description

玉米合约申报价格列表
假设集合竞价最后产生的开盘价是1460.
则对于买单,根据“高于集合竞价产生的价格的买入申报全部成交”原则,1461和1462的买入单都成交;
对于卖单,根据“低于集合竞价产生的价格的卖出申报全部成交”原则,1459和1458的卖出单都成交;
而等于1460的买入或卖出单,原则是:根据买入申报量和卖出申报量的多少,按少的一方的申报量成交。该例子中,1460买入 是20手,卖出是30手,故撮合成交20手,卖出单
有10手没有撮合成交。
集合竞价撮合成交期间的未成交申报单在开市后自动参与连续竞价交易,当天若未成交,结算时则会自动撤单。

五、另附:A股的竞价规则

A股的竞价分为集合竞价和连续竞价两个部分,集合竞价是决定开盘和深市收盘两个价格的重要部分,正因为如此,根据《上市公司股东、董监高减持股份的若干规定》,持股5%以上的大股东进行减持的,三个月内通过集合竞价交易减持的不得超过公司总股本的1%、半年2%,一年不得超过4%,这是一个量的规定。

5.1 准确的交易时间

沪市:上午9:15-9:25集合竞价,上午9:30-11:30和下午13:00-14:57连续交易时间,14:57-15:00为收盘集合竞价时间;

深市:上午9:15-9:25集合竞价,上午9:30-11:30和下午13:00-14:57连续交易时间,14:57-15:00为收盘集合竞价时间。

5.2 在A股市场,开盘价是由集合竞价确定,具体的竞价时间是开盘前的:

1.隔夜委托22:00-9:15
交易日前一晚上22:00-9:15之间的委托叫隔夜委托,即第一个交易日晚上规定时间之后就可以进行第二交易日的委托,委托会暂存在证券公司系统内,在第二天早上9:15分时报送到交易所主机,是由券商统一在集合竞价时间上报到交易所,这个上报的顺序理论上是不分时间,只管价格,但如果股票是涨跌停的,尤其是连续涨跌停的股票,比如新股和被执行风险警示的股票,价格是确定的,总还是有个时间的优先次序,这个时候股票委托按照“时间优先、数量优先、价格优先、大单先成交”的原则,所以如果第二天无论如何都要出来或者进去的,建议隔夜委托,特别是对于确定涨停的股票,在券商交易结算后,一般是22:00以后就下单委托。

2.竞价委托9:15-9:20
这个竞价时间是可以委托也是可以撤单的,其中隔夜委托的单子在这个时间也是可以撤掉的,但时间也就是这五分钟,所以要特别留意。

3.集合竞价9:20-9:25
这个竞价时间只能委托,但不能撤单了,截止到9:25分00秒,确定一个成交量最大的一个价格,就是开盘价,开盘价影响一天的价格,会记录在K线上面,所以非常重要。

4.封闭竞价9:25-9:30/11:30-13:00
这个时间表面上是可以委托和撤单的,但实际上你的所有委托都是暂时保存在券商系统里面,交易所在9:30统一接收,所以这个时间是封闭竞价时间,最终的开市价格由委托的单子撮合成交得出,所以开盘价格有两个:一个是集合竞价确定的开盘价,一个是撮合成交得出的开市价。而下午13:00的开市价格也是这么来的。

5.收盘竞价14:57-15:00
沪市和深市的开盘和收盘前都有一个竞价时间,以同一价格成交量最大化为确定原则。

很久没有升级了,今天升级了。发现很多新的东西,不知道有什么有,问问官方QuoteRequest有什么用?

  • list text here在vnpy>trader>object.py中增加了下面的代码:

    @dataclass
    class QuoteRequest:
      """
      Request sending to specific gateway for creating a new quote.
      """
    
      symbol: str
      exchange: Exchange
      bid_price: float
      bid_volume: int
      ask_price: float
      ask_volume: int
      bid_offset: Offset = Offset.NONE
      ask_offset: Offset = Offset.NONE
      reference: str = ""
    
      def __post_init__(self):
          """"""
          self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
    
      def create_quote_data(self, quoteid: str, gateway_name: str) -> QuoteData:
          """
          Create quote data from request.
          """
          quote = QuoteData(
              symbol=self.symbol,
              exchange=self.exchange,
              quoteid=self.quoteid,
              bid_price=self.bid_price,
              bid_volume=self.bid_volume,
              ask_price=self.ask_price,
              ask_volume=self.ask_volume,
              bid_offset=self.bid_offset,
              ask_offset=self.ask_offset,
              reference=self.reference,
              gateway_name=gateway_name,
          )
          return quote
  • 在vnpy>trader>gateway.py中的BaseGateway类型总增加了下面的函数:

      def send_quote(self, req: QuoteRequest) -> str:
          """
          Send a new two-sided quote to server.
    
          implementation should finish the tasks blow:
          * create an QuoteData from req using QuoteRequest.create_quote_data
          * assign a unique(gateway instance scope) id to QuoteData.quoteid
          * send request to server
              * if request is sent, QuoteData.status should be set to Status.SUBMITTING
              * if request is failed to sent, QuoteData.status should be set to Status.REJECTED
          * response on_quote:
          * return vt_quoteid
    
          :return str vt_quoteid for created QuoteData
          """
          return ""   # 返回了空字符串,

    返回了空字符串,其实什么用也没有,一个接口类函数。貌似是让其他的派生的特定网关来实现这个函数。
    可是我查询了下,CTP、XTP等网关,没有一个实现了该函数,系统中也没有任何一个地方直接或者间接低调用过该send_quote()函数。

  • 从注释来看,好像是用来产生一个引用的作用?请问vnpy官方:这个东西有用吗?

1 集合竞价时段

股票、期货合约都有集合竞价时段,如国内期货合约为前一交易日的21:00前5分钟,股指期货合约为当前交易日的9:30前5分钟,股票合约的为当前交易日的9:30之前。
集合竞价完成之后通常在开市前1分钟提供CTP接口推送第一个tick,注意:这个tick的时间戳为开市前1分钟,而不在各个交易时间段内!
如:
国内期货合约如果第一个交易段为21:00,那么这个tick的时间戳为20:59
国内股指期货合约为当前交易日的9:30,那么这个tick的时间戳为9:29

2 vnpy当前的BarGenerator无法正确处理该tick

实盘中每个合约都会有集合竞价时段,这时候采用自带的BarGenerator来合成1分钟bar就会有问题,因为20:59:00的那个tick既不属于上一交易日的1分钟bar(15:59)的,也不属于21:00的那一个1分钟bar,当然就只能够孤独地自成一个1bar啦!由此带来的问题是21:00的那一个1分钟bar的开盘价和成交量(这个成交量可能是很大)都可能是错了。
当然由此导致的5分钟、10分钟、15分钟、30分钟乃至1日的bar都可能是有问题的,因为它们都是1分钟bar合成的。

3 解决方法:

  • 从合约信息中提前交易时间段信息入手,通常某个交易日的第一个交易时间段前1分钟收到的tick应该归入该交易日的第一个1分钟bar,
  • 如国内期货的为前一交易日的21:00的前1分钟收到的tick应该归入21:00的那一个1分钟bar,
  • 股指期货的为当前交易日的9:30的前1分钟收到的tick应该归入9:30的那一个1分钟bar。
  • 在正确处理了交易日首个1分钟bar合成的基础上,其他的n分钟bar的合成才可能是正确的。

4 顺便谈谈没有办法合成90分钟K线的问题

你现在用BarGenerator没有办法合成90分钟K线,不信你试试看!挺搞笑的一个问题!

在此呼吁vnpy官方,重视CTP接口中的合约信息的作用,重构BarGenerator,作出一个解决当前问题众多的bar合成器。

天下苦此BarGenerator久矣!

一、现象描述

最近编写好了自己的CTA策略,上线运行,发现一个奇怪的现象:
1、设置好策略
2、初始化策略
3、运行策略
策略已经运行起来,一切都表现的正常——可以开仓、平仓。
可是当我想把策略停止下来,我发现交易停止了,可是on_bar()函数中的一些调试信息(为了观察策略的活动而增加的)仍然还在继续工作!
怪了,明明策略研究停止了的,为什么on_bar()函数仍然在执行?

二、原因分析

原来策略中涉及到的合约与CTA策略是被添加到在CtaEngine中的symbol_strategy_map字典中了,而消息EVENT_TICK和CtaEngine的process_tick_event()被注册到消息引擎中,这样子只要接口接受到tick数据就会调用CtaEngine的process_tick_event()。

class CtaEngine(BaseEngine):
    ... 
    ...

    def init_engine(self):
        self.init_rqdata()
        self.load_strategy_class()
        self.load_strategy_setting()
        self.load_strategy_data()
        self.register_event()
        self.write_log("CTA策略引擎初始化成功")

    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)

    def process_tick_event(self, event: Event):
        """"""
        tick = event.data

        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        self.check_stop_order(tick)

        for strategy in strategies:
            if strategy.inited:
                self.call_strategy_func(strategy, strategy.on_tick, tick)

而CtaEngine的process_tick_event()中使用tick的vt_symbol找到symbol_strategy_map字典中对应所有策略,然后把tick逐个推送给策略的on_tick()函数,而这里只是判断每个策略的状态是否为inited==True。

三、如果把已经运行的策略停止会怎样?

如果说策略已经初始化了,那么不管是否被主动停止(trading==False),策略的on_tick()函数都会被执行!进而策略的on_bar()也会被执行,只是所有的交易函数均不会执行而已。
这样对停止策略的处理是否有点文不对题?既然已经停止策略,就应该把它停止下来,可是实际的情况是策略处理交易动作被限制之外,其他的处理逻辑和流程仍然再执行,并没有停止下来!
下面是CtaTemplate类中的几个交易相关函数的代码,其中就有策略的trading状态的限制条件:

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False
    ):
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock
            )
            return vt_orderids
        else:
            return []

    def cancel_order(self, vt_orderid: str):
        """
        Cancel an existing order.
        """
        if self.trading:
            self.cta_engine.cancel_order(self, vt_orderid)

    def cancel_all(self):
        """
        Cancel all orders sent by strategy.
        """
        if self.trading:
            self.cta_engine.cancel_all(self)

只有日K数据可以进行CTA策略回测吗?

  1. 首先vnpy的回测是基于tick数据或者1分钟K线数据的。通常每日使用米筐的数据,根据的你的用户身份,你可能是可以获得某个合约的历史tick数据(通常年费是2-3万多甚至更多)或者1分钟K线数据(通常年费是3千多), vnpy的回测引擎会把这些tick数或者1分钟K线数据合成为你的策略里指标需要的N分钟K数据。
  2. 你的指标应该是基于这个N分钟K线的几个或者成交量计算得到的。由此计算出开仓和平仓信号,同时也确定来开仓和平仓的价格。也就是说开仓和平仓的价格本质上来自于tick数据和1分钟K线数据。
  3. 如果你从其他的渠道只获得来某个合约的日K数据,而不是tick数据或者1分钟K线数据,那么以vnpy目前的回测机制是无法回测的,至少需要自己有针对性地修改。因为没有tick数据或者1分钟K线数据,你的策略的on_tick()是由tick数据推动的,on_bar()函数是有1分钟K线数据推动的,这个两种数据你都没有,策略的on_day_bar()【假设你就是这样命名的】是无法被执行的,因此是无法回测了。
  4. 就算你自己有针对性地修改了回测引擎,也可以on_day_bar()函数了,你的开仓价格和平仓价格因为没有tick数据和1分钟K线数据,只能够使用日K线的开、高、收、低之类的价格而使得测试因为太过粗糙而变得毫无意义。相当于你没有历史行情的细节,怎么能够进行精细的回测呢?可能对于股票之类无杠杆的合约还勉强算的上回测,而对于期货这样的高杠杆的合约,这样的回测几乎没有任何意义!
  5. 对于一根含有较大价格跳空的日K线,单从日K线本身是无法看出来的,跳空区间是无法开仓和平仓的,如果你没有tick数据或1分钟K线数据,想当然地认为你可以在日K线内部的某个价格进行交易,那可能会回测出错得离谱的结果!
  6. 建议vnpy初学者还是先注册个米筐的账户,先试用几个月,然后再购买。别把心思放在可以省钱的免费数据源上,因为麻烦的数据清洗和转换工作,让你的策略开发和调试变成无穷无尽的烦恼。

1 目前的CTA策略只有一个合约

class CtaTemplate(ABC):
    ... ...
    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,   
        vt_symbol: str,             # 合约代码
        setting: dict,
    ):

用户策略启动后,CTA策略引擎帮助订阅代码为vt_symbol合约的行情,用户策略收到tick推送之后,合成1分钟bar、N分钟bar乃至小时日线的bar。
策略在on_xmin_bar()函数中实现交易信号的计算并且完成交易逻辑。
我们可以知道,策略使用vt_symbol合约的行情指导了对vt_symbol合约的交易。

2 CTA策略的应该包含两个合约参数:指标合约和交易合约

2.1 何为交易合约

交易合约是指CTA策略要交易的合约,目前CTA策略里的合约参数就是交易合约,如果是期货的话,我们一般都选择主力合约来交易。

2.2 何为指标合约

指标合约是指CTA策略用来进行各种指标计算的合约,它可以和要交易的合约相同,也可以不同。可以是交易合约所在品种或者板块的指数,也可以是交易合约所在品种的指数或者主连合约,也可以选择如rqdata中的经过主力合约如rb888,rb999之类的主连合约,这样合约是通过行主力合约间的升水和贴水的方式得到的,没有主连合约那样的在主力合约去换时产生的巨大跳空,更加合理。

2.3 增加指标合约的好处

  1. 我们一般都选择主力合约,因为流动性好,在品种合约中占据主导地位。
  2. 主力合约会变化,到期之后需要即时换约
  3. 主力合约的K线通常较短,通常较短,长的7,8个月,短的2,3个月,不利于指标分析。一个期货合约上线通常不足一年,它成为主力合约的时间更短,它在非主力合约时期和成为主力合约之后的行情节奏有时候会判若两人!
  4. 主连合约的时间通常较长,可以长达数年,便于指标分析,但是不可以交易!
  5. 合约指数的时间通常也较长,可以长达数年而且连续平滑,便于指标分析,也是不可以交易!
  6. 如果增加了指标合约,分型行情的时候,使用指数或者主连合约来分析,交易的时候才有主力合约来交易,就可以实现自动换约,及时跟踪到最新的主力合约,有可以避免新近主力合约K线数量太少的问题,一举两得!

3 如何修改?

要做到这一点,需要修改的类包括:

3.1 修改CtaEngine:

对send_order等需要需要区分对策略的交易合约发送委托单,
策略初始化时需要同时订阅指标合约和交易合约的行情。

3.2 修改CtaTemplate:

加指标合约参数vt_symbol1,
指标计算时时有指标合约的行情数据进行进行
交易时使用交易合约进行交易

3.3 修改CTA的StrategyManager

使得在CTA策略在创建时可以输入指标合约

在CTP接口的交易接口的SPI中,有OnRtnOrder和onRtnTrade两个数据接口:

1 OnRtnOrder推送的数据结构CThostFtdcOrderField

struct CThostFtdcOrderField
{
    ///经纪公司代码
    TThostFtdcBrokerIDType BrokerID;
    ///投资者代码
    TThostFtdcInvestorIDType InvestorID;
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///报单引用
    TThostFtdcOrderRefType OrderRef;
    ///用户代码
    TThostFtdcUserIDType UserID;
    ///报单价格条件
    TThostFtdcOrderPriceTypeType OrderPriceType;
    ///买卖方向
    TThostFtdcDirectionType Direction;
    ///组合开平标志
    TThostFtdcCombOffsetFlagType CombOffsetFlag;
    ///组合投机套保标志
    TThostFtdcCombHedgeFlagType CombHedgeFlag;
    ///价格
    TThostFtdcPriceType LimitPrice;
    ///数量
    TThostFtdcVolumeType VolumeTotalOriginal;
    ///有效期类型
    TThostFtdcTimeConditionType TimeCondition;
    ///GTD日期
    TThostFtdcDateType GTDDate;
    ///成交量类型
    TThostFtdcVolumeConditionType VolumeCondition;
    ///最小成交量
    TThostFtdcVolumeType MinVolume;
    ///触发条件
    TThostFtdcContingentConditionType ContingentCondition;
    ///止损价
    TThostFtdcPriceType StopPrice;
    ///强平原因
    TThostFtdcForceCloseReasonType ForceCloseReason;
    ///自动挂起标志
    TThostFtdcBoolType IsAutoSuspend;
    ///业务单元
    TThostFtdcBusinessUnitType BusinessUnit;
    ///请求编号
    TThostFtdcRequestIDType RequestID;
    ///本地报单编号
    TThostFtdcOrderLocalIDType OrderLocalID;
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///会员代码
    TThostFtdcParticipantIDType ParticipantID;
    ///客户代码
    TThostFtdcClientIDType ClientID;
    ///合约在交易所的代码
    TThostFtdcExchangeInstIDType ExchangeInstID;
    ///交易所交易员代码
    TThostFtdcTraderIDType TraderID;
    ///安装编号
    TThostFtdcInstallIDType InstallID;
    ///报单提交状态
    TThostFtdcOrderSubmitStatusType OrderSubmitStatus;
    ///报单提示序号
    TThostFtdcSequenceNoType NotifySequence;
    ///交易日
    TThostFtdcDateType TradingDay;
    ///结算编号
    TThostFtdcSettlementIDType SettlementID;
    ///报单编号
    TThostFtdcOrderSysIDType OrderSysID;
    ///报单来源
    TThostFtdcOrderSourceType OrderSource;
    ///报单状态
    TThostFtdcOrderStatusType OrderStatus;
    ///报单类型
    TThostFtdcOrderTypeType OrderType;
    ///今成交数量
    TThostFtdcVolumeType VolumeTraded;
    ///剩余数量
    TThostFtdcVolumeType VolumeTotal;
    ///报单日期
    TThostFtdcDateType InsertDate;
    ///委托时间
    TThostFtdcTimeType InsertTime;
    ///激活时间
    TThostFtdcTimeType ActiveTime;
    ///挂起时间
    TThostFtdcTimeType SuspendTime;
    ///最后修改时间
    TThostFtdcTimeType UpdateTime;
    ///撤销时间
    TThostFtdcTimeType CancelTime;
    ///最后修改交易所交易员代码
    TThostFtdcTraderIDType ActiveTraderID;
    ///结算会员编号
    TThostFtdcParticipantIDType ClearingPartID;
    ///序号
    TThostFtdcSequenceNoType SequenceNo;
    ///前置编号
    TThostFtdcFrontIDType FrontID;
    ///会话编号
    TThostFtdcSessionIDType SessionID;
    ///用户端产品信息
    TThostFtdcProductInfoType UserProductInfo;
    ///状态信息
    TThostFtdcErrorMsgType StatusMsg;
    ///用户强评标志
    TThostFtdcBoolType UserForceClose;
    ///操作用户代码
    TThostFtdcUserIDType ActiveUserID;
    ///经纪公司报单编号
    TThostFtdcSequenceNoType BrokerOrderSeq;
    ///相关报单
    TThostFtdcOrderSysIDType RelativeOrderSysID;
    ///郑商所成交数量
    TThostFtdcVolumeType ZCETotalTradedVolume;
    ///互换单标志
    TThostFtdcBoolType IsSwapOrder;
    ///营业部编号
    TThostFtdcBranchIDType BranchID;
    ///投资单元代码
    TThostFtdcInvestUnitIDType InvestUnitID;
    ///资金账号
    TThostFtdcAccountIDType AccountID;
    ///币种代码
    TThostFtdcCurrencyIDType CurrencyID;
    ///IP地址
    TThostFtdcIPAddressType IPAddress;
    ///Mac地址
    TThostFtdcMacAddressType MacAddress;
};

2 OnRtnTrade推送的数据结构CThostFtdcTradeField

struct CThostFtdcTradeField
{
    ///经纪公司代码
    TThostFtdcBrokerIDType BrokerID;
    ///投资者代码
    TThostFtdcInvestorIDType InvestorID;
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///报单引用
    TThostFtdcOrderRefType OrderRef;
    ///用户代码
    TThostFtdcUserIDType UserID;
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///成交编号
    TThostFtdcTradeIDType TradeID;
    ///买卖方向
    TThostFtdcDirectionType Direction;
    ///报单编号
    TThostFtdcOrderSysIDType OrderSysID;
    ///会员代码
    TThostFtdcParticipantIDType ParticipantID;
    ///客户代码
    TThostFtdcClientIDType ClientID;
    ///交易角色
    TThostFtdcTradingRoleType TradingRole;
    ///合约在交易所的代码
    TThostFtdcExchangeInstIDType ExchangeInstID;
    ///开平标志
    TThostFtdcOffsetFlagType OffsetFlag;
    ///投机套保标志
    TThostFtdcHedgeFlagType HedgeFlag;
    ///价格
    TThostFtdcPriceType Price;
    ///数量
    TThostFtdcVolumeType Volume;
    ///成交时期
    TThostFtdcDateType TradeDate;
    ///成交时间
    TThostFtdcTimeType TradeTime;
    ///成交类型
    TThostFtdcTradeTypeType TradeType;
    ///成交价来源
    TThostFtdcPriceSourceType PriceSource;
    ///交易所交易员代码
    TThostFtdcTraderIDType TraderID;
    ///本地报单编号
    TThostFtdcOrderLocalIDType OrderLocalID;
    ///结算会员编号
    TThostFtdcParticipantIDType ClearingPartID;
    ///业务单元
    TThostFtdcBusinessUnitType BusinessUnit;
    ///序号
    TThostFtdcSequenceNoType SequenceNo;
    ///交易日
    TThostFtdcDateType TradingDay;
    ///结算编号
    TThostFtdcSettlementIDType SettlementID;
    ///经纪公司报单编号
    TThostFtdcSequenceNoType BrokerOrderSeq;
    ///成交来源
    TThostFtdcTradeSourceType TradeSource;
    ///投资单元代码
    TThostFtdcInvestUnitIDType InvestUnitID;
};

3 问题现象:

3.1 委托单报单日期错误

CThostFtdcOrderField中的有InsertDate(报单日期)字段和TradingDay(交易日),可是中信建投期货把InsertDate填入交易日,而不是委托单报单之时的日期。

3.2 成交单成交日期错误

CThostFtdcTradeField中的有TradeDate(成交日期)字段和TradingDay(交易日),可是中信建投期货把TradeDate填入交易日,而不是成交单成交之时的日期。

3.3 持仓信息在不同的官方软件显示错误

中信建投期货推荐的软件有数据客户端“金建投”和“快期V2”。
本人使用vnpy进行交易,分别使用金建投和快期V2,登录后看到的持仓信息,同一时间、同一品种、看到的持仓均价和浮动盈亏是不一样的,表现为:
1 金建投软件显示的持仓均价和浮动盈亏是对的
2 快期V2软件显示的持仓均价和浮动盈亏是错误
3 vnpy软件显示的持仓均价和浮动盈亏是错误,而且与快期V2显示结果相同

4 与中信建投技术人员反应问题

调试发现,a2101合约,只要是在21:00之后的夜盘交易的记录,委托单的InsertDate和成交单中的TradeDate,均被加上1日或者3日。
现在经过4天左右与中信建投技术人员的交流,他们拒绝承认自己的委托单的InsertDate和成交单中的TradeDate字段错了。
我把从CTP接收到的OrderData和TradeData数据打印给他们,中信建投技术人员说这是第三方表达,不认可vnpy。并且说InsertDate和TradeDate在夜盘的时候就应该是交易日,应该+1或者+3。
现在问题一直没有得到解决。

5 希望有关类似遭遇的朋友,积极发表看法。

请问是快期v2和vnpy在CTP接口处理错误了吗,还是中信建投搞错了?

例子代码

from dataclasses import dataclass,field
from typing import List,Dict,Tuple
from copy import copy,deepcopy

@dataclass
class Item:
    x: int
    y: int
    z: int = field(init=False) # 为生成字段

    def __post_init__(self):
        self.z = self.x * self.y

@dataclass
class Item1:
    x: int
    y: int
    z: int = field(init=False,compare=False)    # z不参加比较

    def __post_init__(self):
        self.z = self.x * self.y

@dataclass
class DC:
    a: int
    b: int
    c: int = field(init=False)

    items: List[Item] = field(default_factory=list)

    def __post_init__(self):
        self.c = self.a + self.b

    def cur_item(self):
        return deepcopy(self.items[-1]) if self.items else None



if __name__ == "__main__":
    # dataclasses 的类型演示
    item1 = Item(1,2)
    item2 = Item(10,20)
    item3 = Item(100,200)

    c1 = DC(3,4)

    c1.items += [item1,item2,item3]

    cur_item = c1.cur_item()

    c2 = deepcopy(c1)
    print(f"\nc1==c2 = {c1==c2}")

    print(f"\nitem1={item1},item2={item2},item3={item3}")
    print(f"c1={c1}")
    print(f"cur_item={cur_item}")
    print(f"item3==cur_item = {item3==cur_item}")
    cur_item.x = 66
    print(f"\nitem3==cur_item={item3==cur_item}")
    print(f"cur_item={cur_item}")
    print(f"c1={c1}")
    print(f"item1={item1},item2={item2},item3={item3}")


    # z参加比较的演示
    # 创建a,b两个Item类
    a = Item(1,2)
    b = Item(1,2)

    print(f"z参加比较的演示")
    print(f"a={a},b={b} a==b = {a==b}")
    b.z = 10    # 字段z也是可以被改变的
    print(f"a={a},b={b} a==b = {a==b}")
    b.__post_init__()
    print(f"a={a},b={b} a==b = {a==b}")

    # z不参加比较的演示
    # 创建a,b两个Item类
    a = Item1(1,2)
    b = Item1(1,2)
    print(f"z不参加比较的演示")
    print(f"a={a},b={b} a==b = {a==b}")
    b.z = 10
    print(f"a={a},b={b} a==b = {a==b}")
    b.__post_init__()
    print(f"a={a},b={b} a==b = {a==b}")

运行结果

c1==c2 = True

item1=Item(x=1, y=2, z=2),item2=Item(x=10, y=20, z=200),item3=Item(x=100, y=200, z=20000)
c1=DC(a=3, b=4, c=7, items=[Item(x=1, y=2, z=2), Item(x=10, y=20, z=200), Item(x=100, y=200, z=20000)])
cur_item=Item(x=100, y=200, z=20000)
item3==cur_item = True

item3==cur_item=False
cur_item=Item(x=66, y=200, z=20000)
c1=DC(a=3, b=4, c=7, items=[Item(x=1, y=2, z=2), Item(x=10, y=20, z=200), Item(x=100, y=200, z=20000)])
item1=Item(x=1, y=2, z=2),item2=Item(x=10, y=20, z=200),item3=Item(x=100, y=200, z=20000)
z参加比较的演示
a=Item(x=1, y=2, z=2),b=Item(x=1, y=2, z=2) a==b = True
a=Item(x=1, y=2, z=2),b=Item(x=1, y=2, z=10) a==b = False
a=Item(x=1, y=2, z=2),b=Item(x=1, y=2, z=2) a==b = True
z不参加比较的演示
a=Item1(x=1, y=2, z=2),b=Item1(x=1, y=2, z=2) a==b = True
a=Item1(x=1, y=2, z=2),b=Item1(x=1, y=2, z=10) a==b = True
a=Item1(x=1, y=2, z=2),b=Item1(x=1, y=2, z=2) a==b = True

购买了米筐的数据后,开始正式实盘。如果此时再想同时启动例外一台PC去回测或者调试,会提示rqdata连接出问题。
如何利用RpcService实现米筐数据共享,可行吗 ?请有经验的分享下经验吧,先谢谢了!!!

1 自己计算当前持仓的保证金及当日盈亏

你可以自己在vnpy中本地会持仓进行逐日盯市计算。方法是保存每日的持仓历史,根据合约信息,计算每日持仓的进行保证金及当日盈亏结算。
可是计算结果和你开户的柜台逐日盯市计算进行比较,你会发现总是不一样,虽然差别不太大! 原因在哪里呢?经过排除,原来问题出于使用了合约的当日收盘价上,而正确的应该是使用期货结算价来计算当前持仓的保证金及当日盈亏。

2 期货结算价和收盘价的区别

2.1 期货结算价

结算价是当天交易结束后,对未平仓合约进行当日交易保证金及当日盈亏结算的基准价。
我国郑州商品交易所、大连商品交易所和上海期货交易所规定:当日结算价取某一期货合约当日成交价格按照成交量的加权平均价;当日无成交价格的,以上一交易日的结算价作为当日结算价。
中国金融期货交易所规定:当日结算价是指某一期货合约最后一小时成交价格按照成交量的加权平均价。 交收日按最后两小时的算术平均价计结算价。

2.2 期货收盘价

期货收盘价是一天交易的最后一个价格,它是由收盘前1分钟所有买卖盘集中撮合而成。

1. 怎么发现的 ?

分析一下盘中启动CTA策略带来的第一根合成K线错误

2. 运行策略的结果:

【first bar time = 2020-10-16 14:52:00+08:00 history bar time = 2020-10-16 14:59:00+08:00,bars count=555】合成的第一个bar的时间比米筐的最新bar的时间也是晚了7分钟
【tick.datetime = 2020-10-16 14:53:00+08:00 is_first_bar=False】 此时实际时间已经15:01了,tick的时间晚了7分钟
【tick.datetime = 2020-10-16 14:54:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:55:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:56:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:57:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:58:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:59:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 15:00:00+08:00 is_first_bar=False】 此时实际时间已经15:10了,tick的时间晚了约10分钟

一般15:00国内的期货交易所就休市了,应该立即就没有tick推送了,可是就连vnpy上仍然可以见到ag2012.SHFE的tick在不断变化,和Demo_Strategy策略调试输出的信息也是吻合的。很显然tick时间比滞后实际时间大概7~10分钟

3. 问问各位先知,是否知道原因?

说明一下,本贴中的bar数据和K线数据其实是一回事,有时候是方便读代码就按照代码说,有时候为了尊崇人们的习惯来说,不必在意。

1 通常CTA策略都是读取和合成K线的

1.1 从一个有代表性的策略DemoStrategy谈起

代码是这样的:

from typing import Any

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager
)

from vnpy.trader.object import (
    BarData,
    TickData
)

from vnpy.trader.constant import Interval

class DemoStrategy(CtaTemplate):
    """ 一个演示策略 """
    author = "hxxjava"

    fast_window = 10
    slow_window = 20

    fast_ma0 = 0
    fast_ma1 = 0
    slow_ma0 = 0
    slow_ma1 = 0

    parameters = [
        "fast_window",
        "slow_window"
    ]

    variables = [
        "fast_ma0",
        "fast_ma1",
        "slow_ma0",
        "slow_ma1",
    ]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict 
    ):
        """构造函数"""
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)

        self.bg = BarGenerator(
            on_bar=self.on_bar,
            window=7,
            on_window_bar=on_7min_bar,
            interval=Interval.Minute)

        self.am = ArrayManager()


    def on_init(self):
        """"""
        self.write_log("策略初始化")
        # account_data = self.cta_engine.get_account()
        self.load_bar(10)

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """ 策略停止 """
        self.write_log(" 策略停止 ")

    def on_tick(self,tick:TickData):
        """ Tick更新 """
        self.bg.update_tick(tick) 

    def on_bar(self, bar: BarData):
        """K线更新"""
        self.bg.update_bar(bar)

    def on_7min_bar(self, bar: BarData):
        """K线更新"""
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]

        """ 定义金叉和死叉 """

        cross_over = (self.fast_ma0>= self.fast_ma1 and
                      self.slow_ma0<self.slow_ma1)  

        cross_below = (self.slow_ma0>self.slow_ma1 and 
                      self.slow_ma0<=self.slow_ma1)

        if cross_over:
            price = bar.close_price + 5

            if not self.pos:
                self.buy(price,1)
            elif self.pos < 0:
                self.cover(price,1)
                self.buy(price,1)
        elif cross_below:
            price = bar.close_price - 5

            if not self.pos:
                self.short(price,1)
            elif self.pos>0:
                self.sell(price,1)
                self.short(price,1)

        # 更新图形界面 
        self.put_event()

这个策略是演示如何利用1分钟K线合成7分钟K线,然后在on_7min_bar()里面利用7分钟K线计算快慢两根移动均线,
然后更加快慢移动均线的金叉和死叉信号来进行多空的开仓和平仓操作,如此实现一个自动策略买卖交易。

1.2 策略工作的过程是这样的:

1.2.1 首先执行构造函数init()

在构造函数init()中创建BarGenerator类型self.bg和管理bar的ArrayManager类型的self.am

1.2.2 然后执行on_init()函数

这里的重点是self.load_bar(10),该函数是策略的父类CtaTemplate的函数,代码是这样的:

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ):
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback = self.on_bar

        self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

self.cta_engine.load_bar()位于vnpy\app\cta_strategy.py中的CtaEngine类中,代码是这样的:

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],load_bar
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

        for bar in bars:
            callback(bar)

因为在策略中使用这样的语句self.load_bar(10),所以use_database参数为默认值False,可是我们知道目前CTP接口是不支持历史数据查询的,所以contract and contract.history_data的条件为假,导致bars 为空, 最终执行了:

bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

而self.query_bar_from_rq的代码是这样的:

    def query_bar_from_rq(
        self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
    ):
        """
        Query bar data from RQData.
        """
        req = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            interval=interval,
            start=start,
            end=end
        )
        data = rqdata_client.query_history(req)
        return data

再看看rqdata_client.query_history(req)的代码,它把产生req的symbol,interval,start 和end各字段,转换成米筐接口可以接受的rq_symbol,rq_interval ,interval,start 和end等4个变量中,然后把end加上1天的时间【注意:这是非常重要的一个技巧,不然无法取出截止到当前交易时刻的1分钟bar!】,最后执行米筐接口函数rqdata_get_price()读取所有的10天多的bar数据,注意:是10天多的bar,而不是整10天的bar!

def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data from RQData.
        """
        if self.symbols is None:
            return None

        symbol = req.symbol
        exchange = req.exchange
        interval = req.interval
        start = req.start
        end = req.end

        rq_symbol = self.to_rq_symbol(symbol, exchange)
        if rq_symbol not in self.symbols:
            return None

        rq_interval = INTERVAL_VT2RQ.get(interval)
        if not rq_interval:
            return None

        # For adjust timestamp from bar close point (RQData) to open point (VN Trader)
        adjustment = INTERVAL_ADJUSTMENT_MAP[interval]

        # For querying night trading period data
        end += timedelta(1)

        # Only query open interest for futures contract
        fields = ["open", "high", "low", "close", "volume"]
        if not symbol.isdigit():
            fields.append("open_interest")

        df = rqdata_get_price(
            rq_symbol,
            frequency=rq_interval,
            fields=fields,
            start_date=start,
            end_date=end,
            adjust_type="none"
        )

        data: List[BarData] = []

        if df is not None:
            for ix, row in df.iterrows():
                dt = row.name.to_pydatetime() - adjustment
                dt = CHINA_TZ.localize(dt)

                bar = BarData(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    datetime=dt,
                    open_price=row["open"],
                    high_price=row["high"],
                    low_price=row["low"],
                    close_price=row["close"],
                    volume=row["volume"],
                    open_interest=row.get("open_interest", 0),
                    gateway_name="RQ"
                )

                data.append(bar)

        return data

同时可以知道interval的默认值为Interval.MINUTE。
至此我们可以看出,self.load_bar(10)其实就是从米筐接口获取的1分钟历史数据。

1.2.3 当策略启动后,接收到tick数据推送时执行on_tick()

这里执行了

self.bg.update_tick(tick)

这是在调用策略的K线合成器self.bg的update_tick() 函数,这个函数是用来把tick数据按照1分钟为间隔来产生1分钟bar的,当1分钟bar合成之时再次调用策略的on_bar()。
BarGenerator的update_tick()的函数代码如下:

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with less intraday trading volume (i.e. older timestamp)
        if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
            return

        if not self.bar:
            new_minute = True
        elif self.bar.datetime.minute != tick.datetime.minute:
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

分析得知它开始生成self.bar的条件是:

 if not self.bar:
    new_minute = True
   ....

 if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )

也就是说只要刚刚启动策略,就会立即生成一根新bar,而没有寻求对齐整分钟,这样会造成首个bar的合成非常可能是不完整的!

1.2.4 策略的on_bar()的执行:

self.bg.update_bar(bar)

这个函数是用1分钟bar来合成7分钟bar的,当7分钟bar合成完成后,它会以7分钟bar为参数调用策略的on_7min_bar()。

1.2.5 策略的on_7min_bar()的执行

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]
        后面的代码就省略了

姑且不论策略是否可以赚钱,因为后面还要针对特定合约进行优化,这不是本帖讨论的重点!
从代码来看,一切都是那么自然,一个完美的例子!

2 如果你是在盘中启动将带来第一根K线错误

这里分析的重点是假如我们在盘中启动策略的话,会发生什么问题,请看图:

description

2.1 第一根合成1分钟K线的丢失部分

如上图中所示:

  1. 灰色的部分为策略利用self.load_bar(10)从米筐读取从启动之时起10日内历史1分钟bar,这就是1.2.2节中描述的那部分bar;
  2. 绿色的部分为策略利用接收到tick合成的1分钟bar,这就是1.2.3节和1.2.4节中描述的那部分bar;
  3. 黄色的部分为第一根合成1分钟K线的丢失部分,这是产生问题的主要原因!

我们知道从米筐接口读取的只有整分的K线数据,它不会提供没有走完的1分钟bar,所以如果你没有在整分钟结束的那一刻启动策略的话(做到这一点的概率太低了!),那么就一定会产生黄色的丢失部分。

2.2 第一根合成1分钟K线的丢失部分的影响

因为第一根合成1分钟K线出现丢失部分,导致第一根合成1分钟K线的开、高、收、低、成交量和开仓兴趣都可能是错误的,进而导致利用1分钟K线合成的7分钟K线也是错误的,这可以说是连锁反应,当然也就会导致利用7分钟K线进行信号计算和交易查询问题!
也许你会说,有那么夸张吗?我不知道!不过这个丢失部分的时间长度在0~59.99秒之间,再说了就算是只有3秒的丢失,也可能是这1分钟中几乎全部的成交量,创新高、创新低都是有可能的,它的缺失也可能是让7分钟K线严重失真的重要原因,谁知道呢!我们这里分析目前的代码就是这样的,从原理上讲它确实是会出错的!

3 怎么解决问题?

解决方法:

  1. 尽量不要在盘中启动策略,在盘前启动好要交易的策略,但这个方法仍然没有解决策略软件的问题。
  2. 在策略中增加是否盘中启动的判断,如果是盘中启动,则在第一根1分钟K线合成之时,抛弃不要,立即从米筐取得前1分钟的K线数据,这样就可以替换掉这个不完整的第一根合成1分钟K线,那么也就解决了第一根7分钟K线错误的问题,完美地解决问题。
  3. 那么解决该问题就需要知道启动策略的时刻是否在交易合约的交易时间段内,那么就需要知道合约的交易时间段信息。米筐接口时提供合约的交易时间段信息的,函数如下:
         get_trading_dates() # 合约的所有的交易日
         get_trading_hours() # 合约的所有的交易时间段
    如果策略启动后最后一个历史1分钟bar与第一个tick数据在一个交易时间段(如9:00-10:15)中, 那么就可以判断出第一个1分钟K线出现了数据丢失,在这个第一个1分钟K线走完之时,就应该从米筐接口立即读取这个刚刚生成的历史1分钟bar,替换掉策略合成的第一个1分钟K线,其他的处理逻辑继续执行就可以了。
  4. 另外一个简单解决方法是: 修改BarGenerator的update_tick(),当其返回合成第一个1分钟bar时,直接从米筐读取这个历史1分钟bar,以此替代之,后续的处理逻辑与目前的代码相同即可。这种方法的好处是不要根据合约的交易时间段来判断,简单,但是可能回因为读取米筐接口需要时间,会否影响tick数据的处理还有待编写代码来测试。
  5. 问题已经发现了,怎么实现代码还在思考中,应该不难。这个问题就难在你可能根本没有意识到它可能有问题!

4 一种解决第一根合成1分钟K线的方法:

按照第3节中的4的方法,修改BarGenerator,代码如下,可以解决问题:

class BarGenerator:
    """
    For:
    1. generating 1 minute bar data from tick data
    2. generateing x minute bar/x hour bar data from 1 minute data

    Notice:
    1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
    2. for x hour bar, x can be any number
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """Constructor"""
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.window: int = window
        self.window_bar: BarData = None
        self.on_window_bar: Callable = on_window_bar

        self.last_tick: TickData = None
        self.last_bar: BarData = None
        self.is_first_bar = True            # hxxjava add

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        from vnpy.trader.rqdata import rqdata_client    # hxxjava add
        from vnpy.trader.object import HistoryRequest   # hxxjava add

        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return False

        # Filter tick data with less intraday trading volume (i.e. older timestamp)
        if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
            return False

        if not self.bar:
            new_minute = True
        elif self.bar.datetime.minute != tick.datetime.minute:
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )

            # hxxjava add start
            if self.is_first_bar:   
                self.is_first_bar = False

                symbol,exchange = extract_vt_symbol(self.bar.vt_symbol)
                bar_datetime = self.bar.datetime
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    start = bar_datetime,
                    end=bar_datetime,
                    interval=Interval.MINUTE
                )
                bars = rqdata_client.query_history(req)
                self.bar = bars[-1]
                print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
            # hxxjava add end

            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            print(f"【tick.datetime = {tick.datetime} is_first_bar={self.is_first_bar}】")
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # If not inited, creaate window bar object
        if not self.window_bar:
            # Generate timestamp for bar data
            if self.interval == Interval.MINUTE:
                dt = bar.datetime.replace(second=0, microsecond=0)
            else:
                dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        finished = False

        if self.interval == Interval.MINUTE:
            # x-minute bar
            if not (bar.datetime.minute + 1) % self.window:
                finished = True
        elif self.interval == Interval.HOUR:
            if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
                # 1-hour bar
                if self.window == 1:
                    finished = True
                # x-hour bar
                else:
                    self.interval_count += 1

                    if not self.interval_count % self.window:
                        finished = True
                        self.interval_count = 0

        if finished:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

        # Cache last bar object
        self.last_bar = bar

    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar

1. 先说明下这里所说的“登录期货账户时间”

这里所说的登录期货账户时间是指:启动软件到连接CTP接口界面,已经输入正确的期货账户和密码,从点击确定开始计时,经过CTP接口的用户身份认证登录行情和交易服务器,直到客户端接收服务器行情推送的当前市场在交易合约的合约信息、已经接受的当前用户的委托单、成交单、持仓和资金账户等信息的推送为止。

2. 各个软件、不同账户的登录期货账户时间测试

注明:每种都是测试5次,取平均值

  1. 使用vn.py软件的CTP网关,登录SimNow模拟期货账户时间:5秒左右
  2. 使用vn.py软件的CTP网关,登录中信建投期货账户时间:25左右,最慢32秒
  3. 使用快期V2软件的CTP版本,登录中信建投期货账户时间:17秒左右,最慢22秒

3. 问题:是什么原因导致这么大差别?

测试1和2的区别是:都是vn.py 2.1.6软件,都是CTP网关,只是使用的账户类型不同,1是模拟期货账户,2是实际期货账户;
测试2和3的区别是:都是实际期货账户,都是CTP网关,只是使用的软件不同,2是模拟vn.py 2.1.6软件,3是快期V2软件的CTP版本;
为什么三个登录期货账户时间的差别这么大呢,对使用?
期望知道这种差别的原因是什么吗,解答一下,先谢谢啦!!!

1 假如你需要扩展CtaTemplate,扩展后的模板叫MyCtaTemplate

下面的代码位于vnpy\usertools\my_cta_template.py中

from typing import Any,List,Dict,Tuple
import copy

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager,
    StopOrder,
    Direction
)

from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event

from vnpy.trader.object import (
    LogData,
    TickData,
    BarData,
    TradeData,
    OrderData,
)

from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval

from vnpy.app.cta_strategy.base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_TICK,
    EVENT_CTA_HISTORY_BAR,
    EVENT_CTA_BAR,
    EVENT_CTA_ORDER,
    EVENT_CTA_TRADE,    
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)

from vnpy.usertools.kx_chart import (   
    NewChartWidget,
    CandleItem,
    VolumeItem, 
    LineItem,
    SmaItem,
    RsiItem,
    MacdItem,
)

from vnpy.usertools.my_strategy_tool import FixedBarGenerator
from vnpy.trader.engine import SamEngine

class MyCtaTemplate(CtaTemplate):
    """ 
    一个包含可视化K线图表和策略账户的策略模板 
    """
    init_money:float = 100000.0 # 初始资金
    kx_interval:int = 5
    show_chart = False  # 显示K线图表 

    kx_count:int = 0

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)       

        self.bg = FixedBarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar,vt_symbol=self.vt_symbol)

        self.am = ArrayManager()

        cta_engine:CtaEngine = self.cta_engine
        self.engine_type = cta_engine.engine_type
        self.even_engine = cta_engine.main_engine.event_engine

        # 必须在这里声明,因为它们是实例变量
        self.all_bars:List[BarData] = [] 
        self.cur_window_bar:[BarData] = None
        self.bar_updated = False

        # 策略账户引擎
        self.sam_engine:SamEngine = cta_engine.main_engine.get_engine('sam')

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        # 创建或者获得策略账户
        if self.sam_engine.create_strategy_account(strategy_name=self.strategy_name,vt_symbols=[self.vt_symbol],
                            init_money=self.init_money,pickup_position=True):
            self.sam_engine.notify_strategy_ui(self.strategy_name)
            self.write_log(f"策略账户{self.strategy_name}创建成功!") 

        account_info = self.sam_engine.get_strategy_total_money(self.strategy_name)
        self.write_log(f"策略账户{account_info}")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bar_updated = False
        self.current_tick = tick    # 记录最新tick 

        # 再更新tick,产生1分钟K线乃至N 分钟线
        self.bg.update_tick(tick)

        if self.inited:     
            # 先产生当前临时K线
            self.cur_window_bar = self.get_cur_window_bar()  
            if self.cur_window_bar:
                # 发送当前临时K线更新消息
                self.send_event(EVENT_CTA_BAR,self.cur_window_bar)           

            self.send_event(EVENT_CTA_TICK,tick)  

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        if self.inited:   
            self.write_log(f"I got a 1min BarData at {bar.datetime}")

        self.bg.update_bar(bar)
        self.bar_updated = True

    def on_Nmin_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.all_bars.append(bar)
        self.kx_count = len(self.all_bars)

        if self.inited:
            self.write_log(f"I got a {self.kx_interval}min BarData at {bar.datetime}")
            self.send_event(EVENT_CTA_BAR,bar)

        self.put_event()

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.send_event(EVENT_CTA_TRADE,trade)

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        self.send_event(EVENT_CTA_ORDER,order)

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        self.send_event(EVENT_CTA_STOPORDER,stop_order)

    def get_cur_window_bar(self):
        window_bar = copy.deepcopy(self.bg.window_bar)
        bar = self.bg.bar

        if not(window_bar): # 刚产生过window_bar
            return None

        if self.bar_updated: # 刚更新过window_bar
            return window_bar

        # 上一分钟window_bar和当前bar合成出临时window bar
        window_bar.high_price = max(window_bar.high_price, bar.high_price)
        window_bar.low_price = min(window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        window_bar.close_price = bar.close_price
        window_bar.volume += int(bar.volume)
        window_bar.open_interest = bar.open_interest
        return window_bar

    def send_event(self,event_type:str,data:Any):   
        """
        只在实盘引擎并且配置为显示K线图表的情况下发送小线
        """     
        if self.engine_type==EngineType.LIVE and self.show_chart:     # "如果显示K线图表"
            self.even_engine.put(Event(event_type,(self.strategy_name,data)))

    def init_kx_chart(self,kx_chart:NewChartWidget=None):    # 提供给外部调用
        if kx_chart:
            kx_chart.add_plot("candle", hide_x_axis=True)
            kx_chart.add_plot("volume", maximum_height=150)
            kx_chart.add_item(CandleItem, "candle", "candle")
            kx_chart.add_item(VolumeItem, "volume", "volume")

2 假如MyCtaTemplate需要参数化和变量输出

MyCtaTemplate的这3个成员需要参数化:

    init_money:float = 100000.0 # 初始资金
    kx_interval:int = 5
    show_chart = False  # 显示K线图表

MyCtaTemplate的这个变量需要输出:

    kx_count:int = 0

3 创建一个策略ChartStrategy,这样做就可以把MyCtaTemplate参数化了

保存下面的代码到 [用户目录]\strategies\chart_strategy.py

from typing import Any,List,Dict,Tuple

from vnpy.usertools.my_cta_template import MyCtaTemplate
from vnpy.app.cta_strategy.base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_TICK,
    EVENT_CTA_HISTORY_BAR,
    EVENT_CTA_BAR,
    EVENT_CTA_ORDER,
    EVENT_CTA_TRADE,    
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import (   
    NewChartWidget,
    ChartItem,
    CandleItem,
    VolumeItem, 
    LineItem,
    SmaItem,
    RsiItem,
    MacdItem,
)

import pyqtgraph as pg
from PyQt5 import QtGui

class ChartStrategy(MyCtaTemplate):

    author = "hxxjava"

    atr_window = 20
    atr_value = 0.0

    parameters = [
        "init_money",
        "kx_interval",
        "show_chart",
        "atr_window",
    ]

    variables = [
        "kx_count",
        "atr_value",
    ]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        super().on_init()

        self.load_bar(20)

        if len(self.all_bars)>0 and self.show_chart:
            self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)

    def on_start(self):
        """ """
        self.write_log("已开始")

    def on_stop(self):
        """"""
        self.write_log("_kx_strategy 已停止")

    def init_kx_chart(self,kx_chart:NewChartWidget=None):    # 提供给外部调用
        # self.write_log("init_kx_chart executed !!!")
        super().init_kx_chart(kx_chart)
        if kx_chart:
            kx_chart.add_plot("rsi", maximum_height=150)
            kx_chart.add_plot("macd", maximum_height=150)

            kx_chart.add_item(LineItem, "line", "candle")
            kx_chart.add_item(SmaItem, "sma1", "candle")
            kx_chart.add_item(SmaItem, "sma2", "candle")

            sma1:SmaItem = kx_chart.get_item("sma1")
            sma2:SmaItem = kx_chart.get_item("sma2")

            blue_pen:QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
            red_pen:QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=2)
            sma1.set_pen(blue_pen)
            sma1.set_sma_window(5)

            sma2.set_pen(red_pen)
            sma2.set_sma_window(20)        

            kx_chart.add_item(RsiItem, "rsi", "rsi")
            kx_chart.add_item(MacdItem, "macd", "macd")
            kx_chart.add_last_price_line()
            kx_chart.add_cursor()

注意上面的策略类的成员parameters和variables的赋值内容,这是本贴的重点!!!

4 创建ChartStrategy的实例,如图所示:

description

5 总结:

对CtaTemplate的扩展策略模板MyCtaTemplate,必须在继承MyCtaTemplate的最终策略中完成参数化和变量输出。

我的vn.py登录后,社区论坛界面没有办法缩放,看起来非常不舒服,求指教!

description

1 vnpy中CTA回测是一个没有保证金参数的回测

1.1 看看BacktesterEngine的run_backtesting()函数

这其中没有保证金参数,这会导致回测可以开得了的仓位,实盘却不一定可以!

   def run_backtesting(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        setting: dict
    ):
        """"""
        self.result_df = None
        self.result_statistics = None

        engine = self.backtesting_engine
        engine.clear_data()

        engine.set_parameters(
            vt_symbol=vt_symbol,
            interval=interval,
            start=start,
            end=end,
            rate=rate,
            slippage=slippage,
            size=size,
            pricetick=pricetick,
            capital=capital,
            inverse=inverse
        )

        strategy_class = self.classes[class_name]
        engine.add_strategy(
            strategy_class,
            setting
        )

        engine.load_data()

        try:
            engine.run_backtesting()
        except Exception:
            msg = f"策略回测失败,触发异常:\n{traceback.format_exc()}"
            self.write_log(msg)

            self.thread = None
            return

        self.result_df = engine.calculate_result()
        self.result_statistics = engine.calculate_statistics(output=False)

        # Clear thread object handler.
        self.thread = None

        # Put backtesting done event
        event = Event(EVENT_BACKTESTER_BACKTESTING_FINISHED)
        self.event_engine.put(event)

2 vnpy中CTA回测认为你想买就一定能够买到

2.1 先看看BacktestingEngine的限价委托单的撮合函数cross_limit_order()

    def cross_limit_order(self):
        """
        Cross limit order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.low_price
            short_cross_price = self.bar.high_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.ask_price_1
            short_cross_price = self.tick.bid_price_1
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for order in list(self.active_limit_orders.values()):
            # Push order update with status "not traded" (pending).
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.on_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG
                and order.price >= long_cross_price
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT
                and order.price <= short_cross_price
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order udpate with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.on_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = max(order.price, short_best_price)
                pos_change = -order.volume

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

            self.trades[trade.vt_tradeid] = trade

2.2 再看看BacktestingEngine的停止单撮合函数cross_stop_order()

    def cross_stop_order(self):
        """
        Cross stop order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.high_price
            short_cross_price = self.bar.low_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.last_price
            short_cross_price = self.tick.last_price
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for stop_order in list(self.active_stop_orders.values()):
            # Check whether stop order can be triggered.
            long_cross = (
                stop_order.direction == Direction.LONG
                and stop_order.price <= long_cross_price
            )

            short_cross = (
                stop_order.direction == Direction.SHORT
                and stop_order.price >= short_cross_price
            )

            if not long_cross and not short_cross:
                continue

            # Create order data.
            self.limit_order_count += 1

            order = OrderData(
                symbol=self.symbol,
                exchange=self.exchange,
                orderid=str(self.limit_order_count),
                direction=stop_order.direction,
                offset=stop_order.offset,
                price=stop_order.price,
                volume=stop_order.volume,
                status=Status.ALLTRADED,
                gateway_name=self.gateway_name,
                datetime=self.datetime
            )

            self.limit_orders[order.vt_orderid] = order

            # Create trade data.
            if long_cross:
                trade_price = max(stop_order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = min(stop_order.price, short_best_price)
                pos_change = -order.volume

            self.trade_count += 1

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.trades[trade.vt_tradeid] = trade

            # Update stop order.
            stop_order.vt_orderids.append(order.vt_orderid)
            stop_order.status = StopOrderStatus.TRIGGERED

            if stop_order.stop_orderid in self.active_stop_orders:
                self.active_stop_orders.pop(stop_order.stop_orderid)

            # Push update to strategy.
            self.strategy.on_stop_order(stop_order)
            self.strategy.on_order(order)

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

结论:它们的撮合机制都是只考虑了价格,没有考虑盘口是否允许

3 vnpy中CTA回测假定认为你永远都有100万的资金

如果你的策略是动态开仓的,你策略中能够引用的参考资金就是回测开始界面中的100万(如果你修改了,或者是其他的值)。因为回测引擎没有把自回测开始的交易中盈亏统计结果回馈给用户策略,因为现在也没有提供这样的接口。
这在实盘中是不可能不考虑的,你投入交易的资金一定是有限的,随着策略的运行,资金权益是会变动的。如果赚钱了,当然不会影响策略运行,只是资金的利用效率不够高而已;如果亏钱了,就可能会影响策略的下单,都快亏光了的资金,还能够按照你原来的资金下单吗?答案是:不可能的!

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】