VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

1. 升级到vnpy 2.5.0后,只要选择spread_trading功能,就报如下错误:

难道包含新版本去掉价差交易功能了吗?

(VN Studio) D:\ProgramFiles\vnstudio>python -m vnstation
qt.network.ssl: QSslSocket: cannot resolve SSL_CTX_set_ciphersuites
qt.network.ssl: QSslSocket: cannot resolve SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback

(VN Studio) D:\ProgramFiles\vnstudio>python -m vnstation
qt.network.ssl: QSslSocket: cannot resolve SSL_CTX_set_ciphersuites
qt.network.ssl: QSslSocket: cannot resolve SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
Traceback (most recent call last):
  File "D:\ProgramFiles\vnstudio\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "D:\ProgramFiles\vnstudio\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\__main__.py", line 4, in <module>
    cli()
  File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1137, in __call__
    return self.main(*args, **kwargs)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1062, in main
    rv = self.invoke(ctx)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1668, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 763, in invoke
    return __callback(*args, **kwargs)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\cli.py", line 28, in runtrader
    run_trader(s)
  File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\cli.py", line 89, in run_trader
    module = importlib.import_module(d["module"])
  File "D:\ProgramFiles\vnstudio\lib\importlib\__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'vnpy_spreadtrading'

2. 如何解决?

打开文件Lib\site-packages\vnstation\info.py,找到APP_INFO列表,做如下修改(见注释):

APP_INFO = [
    {
        "name": "CtaStrategy",
        "module": "vnpy_ctastrategy",
        "class": "CtaStrategyApp",
        "description": "CTA自动交易模块"
    },
    {
        "name": "CtaBacktester",
        "module": "vnpy_ctabacktester",
        "class": "CtaBacktesterApp",
        "description": "CTA回测研究模块"
    },
    {
        "name": "SpreadTrading",
        # "module": "vnpy_spreadtrading",                         # 目前的vnpy的python中并不存在vnpy_spreadtrading目录
        "module": "vnpy.app.spread_trading",                     # 把价差模块重新指向原来的目录
        "class": "SpreadTradingApp",
        "description": "多合约价差套利模块"
    },
    {
        "name": "AlgoTrading",
        "module": "vnpy.app.algo_trading",
        "class": "AlgoTradingApp",
        "description": "算法委托执行交易模块"
    },
    {
        "name": "OptionMaster",
        "module": "vnpy.app.option_master",
        "class": "OptionMasterApp",
        "description": "期权波动率交易模块"
    },
    {
        "name": "PortfolioStrategy",
        "module": "vnpy.app.portfolio_strategy",
        "class": "PortfolioStrategyApp",
        "description": "多合约组合策略模块"
    },
    {
        "name": "ScriptTrader",
        "module": "vnpy.app.script_trader",
        "class": "ScriptTraderApp",
        "description": "脚本策略交易模块"
    },
    {
        "name": "MarketRadar",
        "module": "vnpy.app.market_radar",
        "class": "MarketRadarApp",
        "description": "市场扫描雷达模块"
    },
    {
        "name": "ChartWizard",
        "module": "vnpy.app.chart_wizard",
        "class": "ChartWizardApp",
        "description": "实时K线图表模块"
    },
    {
        "name": "RpcService",
        "module": "vnpy.app.rpc_service",
        "class": "RpcServiceApp",
        "description": "RPC服务器模块"
    },
    {
        "name": "ExcelRtd",
        "module": "vnpy.app.excel_rtd",
        "class": "ExcelRtdApp",
        "description": "EXCEL RTD模块"
    },
    {
        "name": "DataManager",
        "module": "vnpy_datamanager",
        "class": "DataManagerApp",
        "description": "历史数据管理模块"
    },
    {
        "name": "DataRecorder",
        "module": "vnpy.app.data_recorder",
        "class": "DataRecorderApp",
        "description": "实盘行情记录模块"
    },
    {
        "name": "RiskManager",
        "module": "vnpy_riskmanager",
        "class": "RiskManagerApp",
        "description": "事前风险管理模块"
    },
    {
        "name": "WebTrader",
        "module": "vnpy_webtrader",
        "class": "WebTraderApp",
        "description": "Web服务器模块"
    },
    {
        "name": "PortfolioManager",
        "module": "vnpy.app.portfolio_manager",
        "class": "PortfolioManagerApp",
        "description": "投资组合管理模块"
    },
    {
        "name": "PaperAccount",
        "module": "vnpy.app.paper_account",
        "class": "PaperAccountApp",
        "description": "模拟交易账户模块"
    },
]

3. 这样修改正确吗?

按照2来修改Info.py,保存后,重新启动就再次选择spread_trading模块了,没有报错了。
不知道这个问题这样解决是否合适?存在两种可能性:

  1. 官方打算升级spread_trading模块,还没有完成,发布的时候疏漏造成的;
  2. 需要另外补充安装spread_trading模块,然后启动时才可以选择spread_trading模块,有意为之?

1. 本帖的目的

本帖旨在通过对CTP接口推送的tick行情数据进行过滤和分析,揭示利用未经有效性处理的tick驱动各种上层应用可能带来的错误,以及这些错误所带来的影响。
依据事实说话,讲道理,不是空谈!仔细阅读,耐心分析,你一定会用巨大收获!

2. 脏数据的获得方法

按照如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据一文中的4.4节的方法对原始数据进行过滤,其中OmsEngine新增了函数process_origin_tick_event(),代码是这样的:

    def process_origin_tick_event(self,event: Event):#-> None:  # hxxjava debug     
        """ 原始tick数据处理 """
        tick = event.data
        status:StatusData = self.trade_status_manager.get_tick_status(tick)
        # hxxjava debug
        if not status:
            print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
            return

        # 有效交易状态
        if status.instrument_status in VALID_TRADE_STATUSES:
            # 这里是所有有效数据的发源地
            self.event_engine.put(Event(EVENT_TICK, tick))
            self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
        else:
            print(f"{datetime.now()} 特别交易状态={status} {tick}")

3. 脏数据展示

下面是实际运行python -m vnstation之后,对CTP接口推送的数据的过滤而得到的部分结果,暂时称之为脏数据。
之所以称之为脏数据,是因为它们和当前合约的交易状态不符。
其中包含了当时的本地实际、当前合约交易状态和tick数据库三部分。
因为打印出来的内容每行比较长,建议您把下面的内容复制粘贴到其他的文本编辑器(如vscode)中,方便对左右移动,也可以上下对齐。

2021-07-15 07:07:05.767769 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=15, enter_time='23:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 23, 0, 0, 500000), name='铁矿石2109', volume=121262, open_interest=435280.0, last_price=1206.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1215.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1206.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1206.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=51, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=33, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:14:59.837070 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15), name='铁矿石2109', volume=225979, open_interest=448149.0, last_price=1230.5, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=33, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=68, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:14:59.994983 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15, 0, 500000), name='铁矿石2109', volume=225984, open_interest=448151.0, last_price=1231.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=39, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=63, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:15:00.253865 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15, 1), name='铁矿石2109', volume=225984, open_interest=448151.0, last_price=1231.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=7, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=64, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:29:59.852447 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30), name='铁矿石2109', volume=283444, open_interest=465948.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=23, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=29, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:29:59.923405 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 0, 500000), name='铁矿石2109', volume=283453, open_interest=465947.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=23, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=22, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:30:00.185258 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 1), name='铁矿石2109', volume=283456, open_interest=465945.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=21, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=30, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:30:02.015268 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 1, 500000), name='铁矿石2109', volume=283457, open_interest=465944.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=21, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=29, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.727185 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0           ), name='铁矿石2109', volume=359871, open_interest=484883.0, last_price=1234.5, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=169, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=40, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.834127 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 0, 500000), name='铁矿石2109', volume=359884, open_interest=484882.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=167, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=13, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.927076 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 1        ), name='铁矿石2109', volume=359885, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1233.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=167, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=1, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.931069 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 1, 500000), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:01:04.571289 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 1, 5, 500000), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:10:52.207334 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 10,53       ), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)

4. 脏数据分类

为了方便描述,我们把上面的内容进行编号,编号的顺序为打印信息的本地时间,

1. 2021-07-15 07:07:05.767769
2. 2021-07-15 10:14:59.837070
3. 2021-07-15 10:14:59.994983
4. 2021-07-15 10:15:00.253865
5. 2021-07-15 11:29:59.852447
6. 2021-07-15 11:29:59.923405
7. 2021-07-15 11:30:00.185258
8. 2021-07-15 11:30:02.015268
9. 2021-07-15 15:00:01.727185
10. 2021-07-15 15:00:01.834127
11. 2021-07-15 15:00:01.927076
12. 2021-07-15 15:00:01.931069
13. 2021-07-15 15:01:04.571289
14. 2021-07-15 15:10:52.207334

下面是对这几条的分析:

  1. 再次连接CTP接口后,接口自动推送i2109合约昨天夜里23:00收盘的价格和盘口信息,

  2. 已经进入上午10:15休市时段,volume=225979, last_price=1230.5

  3. 已经进入上午10:15休市时段,volume=225984, last_price=1231.0 bid_volume_1=39 ask_volume_1=63
  4. 已经进入上午10:15休市时段,volume=225984, last_price=1231.0 bid_volume_1=7 ask_volume_1=64

  5. 已经进入上午11:30休市时段,volume=283444, open_interest=465948.0, last_price=1230.0,

  6. 已经进入上午11:30休市时段,volume=283453, open_interest=465947.0, last_price=1230.0,
  7. 已经进入上午11:30休市时段,volume=283456, open_interest=465945.0, last_price=1230.0,
  8. 已经进入上午11:30休市时段,volume=283457, open_interest=465944.0, last_price=1230.0,

  9. 已经进入下午15:00休市时段,volume=359871, open_interest=484883.0, last_price=1234.5,ask_price_1=1234.5,ask_volume_1=40,
    10.已经进入下午15:00休市时段,volume=359884, open_interest=484882.0, last_price=1234.0,ask_price_1=1234.0,ask_volume_1=13,
    11.已经进入下午15:00休市时段,volume=359885, open_interest=484883.0, last_price=1234.0,ask_price_1=1234.5,ask_volume_1=1,

12.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0
10.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0 —— 除了和时间戳与12不同之外,tick的其他字段完全相同
11.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0 —— 除了和时间戳与12不同之外,tick的其他字段完全相同

5. 如果不过滤这些tick数据,对各种应用的有何影响?

对于1中的tick,不过滤可以在刚启动的行情列表中显示,对启动应用后再连接接口的上层应用会产生多余的K线,过滤了更好

对于2,3,4中的tick,是进入休市后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了更好

对于5,6,7中的tick,是进入休市后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了更好

对于9,10,11中的tick,是进入收盘后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了影响跨收盘时刻K线的持仓兴趣、成交量等信息的统计
对于12,13,14中的tick,是进入收盘后已经没有撮合, 完全可以过滤掉

为什么说2-7中的tick过滤了更好?
因为这些tick是在非交易时间段延续的撮合,其价格、持仓兴趣、成交量可以忽略,它们采用下一个交易状态下tick中的价格、持仓兴趣、成交量更好!

如何更有效地利用合约交易状态信息——把集合竞价tick合并到开盘的K线中?

1. 合约集合竞价阶段的交易状态通知消息

1.1 合约集合竞价阶段

对于国内期货,通常,夜盘合约在20:55之后,日盘合约在8:55之后,合约进入集合竞价报单状态后,交易者就可以通过交易即可进行下单。下单后不会立即成交,而是经过交易所按照一定的规则进行匹配成交。集合竞价报单状态期间,可能会有tick推送给订阅的CTP客户端(如郑商所),但是此时的tick只有价格(last_price)没有成交量(volume=0);也可能没有任何tick推送给订阅的CTP客户端(如上期所)。
之后夜盘合约在20:59之后,日盘合约在8:59之后之后,合约进入集合竞价撮合阶段,交易所会通过行情接口推送1个tick到CTP客户端,之后会通过交易接口立即推送一次所有合约的连接的所有CTP客户端。此tick中包含的价格就是开盘价,成交量不为0。

1.2 集合竞价相关交易状态的提取

我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中介绍了如果提取合约交易状态信息。这些合约交易状态信息就包含了与合约集合竞价相关的几个状态,包括:集合竞价报单、集合竞价平衡(本人一直没有检测到)和集合竞价撮合几个交易状态。

1.3 目前的vnpy对集合竞价的处理是不完善的

因为vnpy对集合竞价产生的这个tick没有特别处理,把它看成是一个普通的tick了,所以如果你的策略中使用到n分钟周期K线的话,很可能产生一个只包含此tick的5分钟K线。
这和人们的通常习惯不符,也和通常的主流软件都不相同,也不符合集合竞价的原来本意不符,它应该被合并到包含开盘时刻的K线之中。
经过对合约交易状态的研究,发现合约交易状态信息恰好可以帮助我们正确处理集合竞价产生的这个tick,思路大致如下:
不失一般性,以5分钟K线为例:

  • 集合竞价报单之后得到了开盘tick,按照正常逻辑产生新的5分钟K线
  • 紧接着收到了集合竞价撮合状态,得到下一个交易状态next_status,如果tick价格没有触及涨停价或者跌停价,通常next_status为应该为连续竞价状态
  • 不对当前K进行是否结束的判断,而立即把当前K线的时间戳修改为下一个交易状态的进入时间next_status.enter_time
  • 随后市场进入连续竞价状态,收到新的连续竞价的tick之后,进行判断时当然也是当前的5分钟K线没有结束。
  • 此新的5分钟K线就包含了该集合竞价产生的价格和成交量。

2. 缓存集合竞价阶段产生的tick等待开盘后再发送

2.1 缓存集合竞价阶段产生的开盘tick,等遇到连续竞价的一个tick再发送

修改vnpy\trader\engine.py中的OmsEngine的原始tick数据处理函数process_origin_tick_event()。
当收到集合竞价产生的开盘tick时,向tick中添加auction_tick属性并且赋值为True,缓存到self.ticks字典中,不转发该原始tick;收到其他连续竞价tick进行,检查self.ticks字典中是否存在该合约缓存的集合竞价产生的开盘tick,如果有则将缓存的开盘tick时间戳赋值为当前tick的时间戳减去1微秒,然后先发送开盘tick到消息队列,然后再发送当前tick到消息队列。代码如下:

代码如下:

    def process_origin_tick_event(self,event: Event):#-> None:  # hxxjava debug     
        """ 原始tick数据处理 """
        tick:TickData = event.data
        status:StatusData = self.trade_status_manager.get_tick_status(tick)
        # hxxjava debug
        if not status:
            print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
            return          

        # 有效交易状态
        if status.instrument_status in VALID_TRADE_STATUSES:
            # 这里是所有有效数据的发源地
            # 处理行情和合约交易状态不同步的问题
            relay_tick = False
            tick_time = tick.datetime.strftime("%H%M%S.%f")
            next_status = self.get_next_status(tick.vt_symbol)
            if status.instrument_status in AUCTION_STATUS:
                # 当前处在集合竞价阶段
                if tick.volume == 0:
                    # 集合竞价保单的tick,放弃
                    return
                if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
                    tick_time >= next_status.enter_time:
                        # 连续竞价状态晚于其tick
                        relay_tick = True
                else:
                    # 集合竞价产生的开盘tick,打上标记,缓存不发送
                    tick.aution_tick = True
                    self.ticks[tick.vt_symbol] = tick
            else:
                # 当前处在连续竞价阶段,转发tick
                relay_tick = True

            if relay_tick:
                pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
                if pre_tick and hasattr(pre_tick,"auction_tick"):
                    # 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
                    pre_tick.datetime = tick.datetime - timedelta(microseconds=1) 
                    self.event_engine.put(Event(EVENT_TICK, pre_tick))
                    self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))

                self.event_engine.put(Event(EVENT_TICK, tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
            else:
                # 把集合竞价tick转发给订阅者
                self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
        # else:
        #     print(f"{datetime.now()} 特别交易状态={status} {tick}")

2.2 完整的OmsEngine代码

class OmsEngine(BaseEngine):
    """
    Provides order management system function for VN Trader.
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        # self.trade_status = TradeStatus(event_engine,6)   # hxxjava add

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.active_orders: Dict[str, OrderData] = {}
        self.trade_status_manager = TradeStatusManager(event_engine,30)       # hxxjava debug

        self.add_function()
        self.register_event()

    def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_active_orders = self.get_all_active_orders
        self.main_engine.get_status = self.get_status                   # hxxjava debug
        self.main_engine.get_next_status = self.get_next_status         # hxxjava debug

    def register_event(self) -> None:
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  
        self.event_engine.register(EVENT_STATUS, self.process_status_event)  # hxxjava debug
        self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event)   # hxxjava debug

    def process_origin_tick_event(self,event: Event):#-> None:  # hxxjava debug     
        """ 原始tick数据处理 """
        tick:TickData = event.data
        status:StatusData = self.trade_status_manager.get_tick_status(tick)
        # hxxjava debug
        if not status:
            print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
            return          

        # 有效交易状态
        if status.instrument_status in VALID_TRADE_STATUSES:
            # 这里是所有有效数据的发源地
            # 处理行情和合约交易状态不同步的问题
            relay_tick = False
            tick_time = tick.datetime.strftime("%H%M%S.%f")
            next_status = self.get_next_status(tick.vt_symbol)
            if status.instrument_status in AUCTION_STATUS:
                # 当前处在集合竞价阶段
                if tick.volume == 0:
                    # 集合竞价保单的tick,放弃
                    return
                if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
                    tick_time >= next_status.enter_time:
                        # 连续竞价状态晚于其tick
                        relay_tick = True
                else:
                    # 集合竞价产生的开盘tick,打上标记,缓存不发送
                    tick.aution_tick = True
                    self.ticks[tick.vt_symbol] = tick
            else:
                # 当前处在连续竞价阶段,转发tick
                relay_tick = True

            if relay_tick:
                pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
                if pre_tick and hasattr(pre_tick,"auction_tick"):
                    # 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
                    pre_tick.datetime = tick.datetime - timedelta(microseconds=1) 
                    self.event_engine.put(Event(EVENT_TICK, pre_tick))
                    self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))

                self.event_engine.put(Event(EVENT_TICK, tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
            else:
                # 把集合竞价tick转发给订阅者
                self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
        # else:
        #     print(f"{datetime.now()} 特别交易状态={status} {tick}")


    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data
        self.ticks[tick.vt_symbol] = tick

        status = self.get_status(tick.vt_symbol)
        if status.instrument_status in AUCTION_STATUS:
            next_status = self.get_next_status(tick.vt_symbol)
            print(f"{datetime.now()} 集合竞价状态: \n\t{status}\n\t{next_status}\n\t{tick}")

    def process_order_event(self, event: Event) -> None:
        """"""
        order = event.data
        self.orders[order.vt_orderid] = order

        # If order is active, then update data in dict.
        if order.is_active():
            self.active_orders[order.vt_orderid] = order
        # Otherwise, pop inactive order from in dict
        elif order.vt_orderid in self.active_orders:
            self.active_orders.pop(order.vt_orderid)

    def process_trade_event(self, event: Event) -> None:
        """"""
        trade = event.data
        self.trades[trade.vt_tradeid] = trade

    def process_position_event(self, event: Event) -> None:
        """"""
        position = event.data
        self.positions[position.vt_positionid] = position

    def process_account_event(self, event: Event) -> None:
        """"""
        account = event.data
        self.accounts[account.vt_accountid] = account

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        self.contracts[contract.vt_symbol] = contract

    def process_status_event(self, event: Event) -> None:   # hxxjava debug
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.trade_status_manager.save_status(status)
        print(f"【{datetime.now()} {status}】")

    def get_tick(self, vt_symbol: str) -> Optional[TickData]:
        """
        Get latest market tick data by vt_symbol.
        """
        return self.ticks.get(vt_symbol, None)

    def get_order(self, vt_orderid: str) -> Optional[OrderData]:
        """
        Get latest order data by vt_orderid.
        """
        return self.orders.get(vt_orderid, None)

    def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
        """
        Get trade data by vt_tradeid.
        """
        return self.trades.get(vt_tradeid, None)

    def get_position(self, vt_positionid: str) -> Optional[PositionData]:
        """
        Get latest position data by vt_positionid.
        """
        return self.positions.get(vt_positionid, None)

    def get_account(self, vt_accountid: str) -> Optional[AccountData]:
        """
        Get latest account data by vt_accountid.
        """
        return self.accounts.get(vt_accountid, None)

    def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
        """
        Get contract data by vt_symbol.
        """
        return self.contracts.get(vt_symbol, None)

    def get_all_ticks(self) -> List[TickData]:
        """
        Get all tick data.
        """
        return list(self.ticks.values())

    def get_all_orders(self) -> List[OrderData]:
        """
        Get all order data.
        """
        return list(self.orders.values())

    def get_all_trades(self) -> List[TradeData]:
        """
        Get all trade data.
        """
        return list(self.trades.values())

    def get_all_positions(self) -> List[PositionData]:
        """
        Get all position data.
        """
        return list(self.positions.values())

    def get_all_accounts(self) -> List[AccountData]:
        """
        Get all account data.
        """
        return list(self.accounts.values())

    def get_all_contracts(self) -> List[ContractData]:
        """
        Get all contract data.
        """
        return list(self.contracts.values())

    def get_status(self,vt_symbol:str) -> Optional[StatusData]:     # hxxjava debug
        """
        Get the vt_symbol's status data.
        """
        return self.trade_status_manager.get_status(vt_symbol)

    def get_next_status(self,vt_symbol:str) -> Optional[StatusData]:     # hxxjava debug
        """
        Get the vt_symbol's status data.
        """
        return self.trade_status_manager.get_next_status(vt_symbol)

    def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
        """
        Get all active orders by vt_symbol.

        If vt_symbol is empty, return all active orders.
        """
        if not vt_symbol:
            return list(self.active_orders.values())
        else:
            active_orders = [
                order
                for order in self.active_orders.values()
                if order.vt_symbol == vt_symbol
            ]
            return active_orders

3. 对竞价竞价tick的特别转发

3.1 各种交易状态下处理方法的分析

按照上面的处理,我们就把从CTP接口推送来的tick进行来过滤。

  1. 对非交易状态的下的tick进行丢弃处理
  2. 对集合保单状态下的tick(有的市场有推送,如CZCE,有的则没有推送)也进行丢弃处理。因为它的成交量为0,所有只有价格可以使用,其他信息无法使用。如果你想观看集合竞价报道阶段的价格变化情况,可以另外添加集合竞价消息类型对其进行特别包装处理发送。
  3. 对集合竞价撮合状态产生的开盘tick进行缓存和延后处理,恰好满足了我们把该tick合并到连续竞价时段中,这样我们的K线就可以包含该tick的各个价格和成交量等信息了。但是可能在20:59分钟的时候,可能暂时看不到该tick的信息。如果想知道该信息,可以另外添加集合竞价消息类型对其进行特别包装处理发送。

3.2 增加集合竞价消息类型

3.2.1 增加集合竞价消息类型

在vnpy\trader\event.py文件中添加集合竞价tick数据消息

EVENT_AUCTION_TICK = "eAuctionTick."             # 集合竞价tick数据消息 hxxjava debug

3.2.2 修改主界面行情列表控件

修改vnpy\trader\ui\widget.py中的BaseMonitor的消息订阅函数,使得一个控件可以订阅多个消息类型,条件是数据是相同格式的:

    def register_event(self) -> None:
        """
        Register event handler into event engine.
        """
        if self.event_type:
            self.signal.connect(self.process_event)
            if type(self.event_type) == list: # hxxjava debug
                for ev in self.event_type:
                    self.event_engine.register(ev, self.signal.emit)
                    # print(f"已经订阅 {ev} 消息")
            else:
                self.event_engine.register(self.event_type, self.signal.emit)

修改vnpy\trader\ui\widget.py中的TickMonitor的消息类型:

class TickMonitor(BaseMonitor):
    """
    Monitor for tick data.
    """
    # event_type = EVENT_TICK
    event_type = [EVENT_TICK,EVENT_AUCTION_TICK]   # 这里的消息类型不再是一种,而是一个列表
    data_key = "vt_symbol"
    sorting = True

    headers = {
        "symbol": {"display": "代码", "cell": BaseCell, "update": False},
        "exchange": {"display": "交易所", "cell": EnumCell, "update": False},
        "name": {"display": "名称", "cell": BaseCell, "update": True},
        "last_price": {"display": "最新价", "cell": BaseCell, "update": True},
        "volume": {"display": "成交量", "cell": BaseCell, "update": True},
        "open_price": {"display": "开盘价", "cell": BaseCell, "update": True},
        "high_price": {"display": "最高价", "cell": BaseCell, "update": True},
        "low_price": {"display": "最低价", "cell": BaseCell, "update": True},
        "bid_price_1": {"display": "买1价", "cell": BidCell, "update": True},
        "bid_volume_1": {"display": "买1量", "cell": BidCell, "update": True},
        "ask_price_1": {"display": "卖1价", "cell": AskCell, "update": True},
        "ask_volume_1": {"display": "卖1量", "cell": AskCell, "update": True},
        "datetime": {"display": "时间", "cell": TimeCell, "update": True},
        "gateway_name": {"display": "接口", "cell": BaseCell, "update": False},
    }

至此,已经完成因为集合竞价过滤对行情显示控件的兼容了!!!

4. “如何更有效地利用合约交易状态信息”系列已经完成

经过近2个月的编写、调试、观察、再调试,“如何更有效地利用合约交易状态信息”系列已经基本完成,
也基本完成了我当时对CTP接口中合约交易状态通知接口可能的几大用功能的实现。

  • 可以24小时开机,无需担脏数据的干扰和威胁,防止你的策略误动作。
  • 同时它还可以帮助您产生正确的K线。
  • 尽可能地防止你的策略在非连续交易时段中交易。

我想这对任何程序化交易者都是个好消息!如果您也想这么做,请翻阅我这个系列的帖子,希望能够帮到您!

如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据。

1. 任何接口中都会有脏数据,CTP接口也不例外

不知道您是否发现这些情况:

  1. 明明已经休市或者收盘了,可是行情列表中的合约数据还在疯狂地涌来,如果此时你的策略还在运行状态,那会发生什么?
  2. 市场中许多人使用数据记录器(DataRecorder)进行合约数据的录制,经常发现录制到的结果里有许多莫名其妙的数据!
  3. 于是我们会说,尽量要靠近开盘再启动程序或者策略,一旦收盘就关闭vnpy系统,这样就可以避免被接口中的脏数据干扰和影响。

这种情况不只是使用CTP接口的交易者会遇到,其他接口也是一样。

2. CTP接口本身已经有完善的机制

为了防止客户端接收到脏数据,CTP接口会在某个交易时间端的开始结束时,在公共流中播发所有品种的合约品种的交易状态通知,只是vnpy系统没有使用。
交易状态通知的格式见帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器

3. 拒绝CTP接口中的脏数据的方法

系统连接接口后,订阅的行情数据就会从CTP网关的MdApi接口中推送给客户端。目前vnpy在收到tick数据时没有任何有效性判断,就直接推送给了各种订阅者了,而这正是脏数据的来由!
正确的做法是:客户端就在收到tick数据的时候,根据合约品种的交易状态判断该数据是是否为有效数据,如果合约的状态是开盘前、非交易或者是收盘状态,则将该tick丢弃。
这样就可以杜绝脏数据对交易者、各种应用策略或数据记录器的影响。

4. 代码实现

4.1 实现交易状态管理器

交易状态管理器的功能及实现代码详见 如何更有效地利用合约交易状态信息——交易状态信息管理器 ,这里就不再提供了。

4.2 添加EVENT_ORIGIN_TICK消息类型

在vnpy\trader\event.py文件中增加下面的消息定义:

EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug

4.3 修改网关Gateway的on_tick()函数

在vnpy\trader\gateway.py文件中,将所有网关的父类Gateway的on_tick()函数做如下修改:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        # self.on_event(EVENT_TICK, tick)
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)
        self.on_event(EVENT_ORIGIN_TICK, tick)

4.4 修改OmsManager

class OmsEngine(BaseEngine):
    """
    Provides order management system function for VN Trader.
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        # self.trade_status = TradeStatus(event_engine,6)   # hxxjava add

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.active_orders: Dict[str, OrderData] = {}
        self.trade_status_manager = TradeStatusManager(event_engine,30)       # 创建交易状态管理器

        self.add_function()
        self.register_event()

    def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_active_orders = self.get_all_active_orders
        self.main_engine.get_status = self.get_status                   # hxxjava debug

    def register_event(self) -> None:
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  
        self.event_engine.register(EVENT_STATUS, self.process_status_event)  # 订阅合约交易状态数据
        self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event)   # 订阅原始行情数据

    def process_origin_tick_event(self,event: Event):#-> None:  # 处理原始行情数据     
        """ 对原始tick数据进行有效性判断和处理 """
        tick = event.data
        status:StatusData = self.trade_status_manager.get_tick_status(tick)

        if not status:
            print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
            return

        # 有效交易状态
        valid_statuses = [
            InstrumentStatus.CONTINOUS,
            InstrumentStatus.AUCTION_ORDERING,
            InstrumentStatus.AUCTION_BALANCE,
            InstrumentStatus.AUCTION_MATCH
        ]
        if status.instrument_status in valid_statuses:
            # 这里是所有有效数据的发源地
            self.event_engine.put(Event(EVENT_TICK, tick))
            self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
        else:
            print(f"{datetime.now()} 特别交易状态={status} {tick}")


    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data
        self.ticks[tick.vt_symbol] = tick

    def process_order_event(self, event: Event) -> None:
        """"""
        order = event.data
        self.orders[order.vt_orderid] = order

        # If order is active, then update data in dict.
        if order.is_active():
            self.active_orders[order.vt_orderid] = order
        # Otherwise, pop inactive order from in dict
        elif order.vt_orderid in self.active_orders:
            self.active_orders.pop(order.vt_orderid)

    def process_trade_event(self, event: Event) -> None:
        """"""
        trade = event.data
        self.trades[trade.vt_tradeid] = trade

    def process_position_event(self, event: Event) -> None:
        """"""
        position = event.data
        self.positions[position.vt_positionid] = position

    def process_account_event(self, event: Event) -> None:
        """"""
        account = event.data
        self.accounts[account.vt_accountid] = account

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        self.contracts[contract.vt_symbol] = contract

    def process_status_event(self, event: Event) -> None:   # 处理交易状态信息
        """"""
        status = event.data
        # print(f"【{datetime.now()} {status}】")
        self.trade_status_manager.save_status(status)

    def get_tick(self, vt_symbol: str) -> Optional[TickData]:
        """
        Get latest market tick data by vt_symbol.
        """
        return self.ticks.get(vt_symbol, None)

    def get_order(self, vt_orderid: str) -> Optional[OrderData]:
        """
        Get latest order data by vt_orderid.
        """
        return self.orders.get(vt_orderid, None)

    def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
        """
        Get trade data by vt_tradeid.
        """
        return self.trades.get(vt_tradeid, None)

    def get_position(self, vt_positionid: str) -> Optional[PositionData]:
        """
        Get latest position data by vt_positionid.
        """
        return self.positions.get(vt_positionid, None)

    def get_account(self, vt_accountid: str) -> Optional[AccountData]:
        """
        Get latest account data by vt_accountid.
        """
        return self.accounts.get(vt_accountid, None)

    def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
        """
        Get contract data by vt_symbol.
        """
        return self.contracts.get(vt_symbol, None)

    def get_all_ticks(self) -> List[TickData]:
        """
        Get all tick data.
        """
        return list(self.ticks.values())

    def get_all_orders(self) -> List[OrderData]:
        """
        Get all order data.
        """
        return list(self.orders.values())

    def get_all_trades(self) -> List[TradeData]:
        """
        Get all trade data.
        """
        return list(self.trades.values())

    def get_all_positions(self) -> List[PositionData]:
        """
        Get all position data.
        """
        return list(self.positions.values())

    def get_all_accounts(self) -> List[AccountData]:
        """
        Get all account data.
        """
        return list(self.accounts.values())

    def get_all_contracts(self) -> List[ContractData]:
        """
        Get all contract data.
        """
        return list(self.contracts.values())

    def get_status(self,vt_symbol:str) -> List[StatusData]:     # hxxjava debug
        """
        Get the vt_symbol's status data.
        """
        return self.trade_status_manager.get_status(vt_symbol)

    def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
        """
        Get all active orders by vt_symbol.

        If vt_symbol is empty, return all active orders.
        """
        if not vt_symbol:
            return list(self.active_orders.values())
        else:
            active_orders = [
                order
                for order in self.active_orders.values()
                if order.vt_symbol == vt_symbol
            ]
            return active_orders

5. 采用这样过滤脏数据的方法的好处

采用这样过滤脏数据的方法,可以从源头一次性地过滤掉脏数据,避免后面的各种应用中分别地对推送的行情数据进行过滤,应用策略无需再考虑脏数据的影响,逻辑简单,效率高。现在再回过头来看帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器 里的方法,就有点效率低下了!

6. 其他接口怎么办?

本文只提供了CTP接口修改方法,没有涉及到其他类型的接口。可是作为一个完善的交易接口,通常都应该提供类似CTP接口中的类似合约交易状态通知的信息。采用的类似的方法也是可以办到的。

注明:
本贴是继如何解决CTP接口中集合竞价后立即下单被拒的问题 !之后,对交易状态信息进行更为深入地研究,实现了交易状态信息管理器,它可以更为有效地利用合约交易状态信息。

1. 交易状态信息管理器

1.1 功能介绍

主要功能:
1)集中保存所有品种的交易状态到字典
2)保存所有品种的交易状态到字典到json文件
3)从json文件加载已经保存的交易状态到字典
4)定时同步交易状态到字典到json文件
5)提取交易合约的在某个时刻的交易状态

1.2 代码实现

class TradeStatusManager(object):
    """ 
    交易状态管理器。
    主要功能:
    1)集中保存所有品种的交易状态到字典
    2)保存所有品种的交易状态到字典到json文件
    3)从json文件加载已经保存的交易状态到字典
    4)定时同步交易状态到字典到json文件
    5)提取交易合约的在某个时刻的交易状态
    """
    json_file = "trade_status.json"

    def __init__(self,event_engine:EventEngine,interval:int)-> None:
        self.event_engine = event_engine
        self.interval = interval
        self.count = 0
        # {vt_symbol:{trade_segment_sn:StatusData}}
        self.trade_status_map:Dict[str:Dict] = self.load_from_file()
        self.change = False

        self.register_event()

    def register_event(self):
        """ 注册消息事件 """
        self.event_engine.register(EVENT_TIMER,self.on_time_event)

    def load_from_file(self):
        """ 
        从json文件读取交易状态映射
        """
        filepath = get_file_path(self.json_file)
        # print(f"filepath = {filepath}")
        if filepath.exists():
            with open(filepath, mode="r", encoding="UTF-8") as f:
                data = json.load(fp=f,cls=TradeStatusDecoder)
            return data
        else:
            save_json(self.json_file, {})
            return {}

    def sync_to_file(self):
        """ 
        保存交易状态映射到json文件 
        """
        filepath = get_file_path(self.json_file)
        with open(filepath, mode="w+", encoding="UTF-8") as f:
            json.dump(
                self.trade_status_map,fp=f,
                cls=TradeStatusEncoder,
                indent=4,
                ensure_ascii=False
            )

    def on_time_event(self,event:Event):
        self.count += 1
        if self.count > self.interval:
            self.count = 0
            if self.change:
                self.sync_to_file()
                self.change = False
                print(f"已经保存交易状态映射到交易状态映射!")

    def save_status(self,status:StatusData):
        """ 
        保存交易状态映射到交易状态映射 
        """
        vt_symbol = status.vt_symbol
        if vt_symbol not in self.trade_status_map:
            self.trade_status_map[vt_symbol] = {}
        segment_sn_str = f"{status.trading_segment_sn}".rjust(2,'0')
        self.trade_status_map[vt_symbol].update({"current":segment_sn_str})
        self.trade_status_map[vt_symbol].update({segment_sn_str:status})
        next_segment_sn = self._get_next_segment_sn(status.vt_symbol)
        self.trade_status_map[vt_symbol].update({"next":next_segment_sn})
        self.change = True

    def get_status(self,vt_symbol:str):
        """ 
        获得一个datetime所处的交易状态 
        """
        # 构造
        symbol,exchange = extract_vt_symbol(vt_symbol)
        instrument = left_alphas(symbol)
        vt_symbol1 = f"{instrument}.{exchange.value}"

        ret_status = None
        # dt_time = dt.time()
        if vt_symbol1 in self.trade_status_map:
            current = self.trade_status_map[vt_symbol1]['current']
            # for enter_time_str,status in self.trade_status_map[vt_symbol1].items():
            #     enter_time = datetime.strptime(enter_time_str,"%H:%M:%S").time()
            #     if dt_time >= enter_time:
            #         ret_status = status 
            ret_status = self.trade_status_map[vt_symbol1][current]

        return ret_status

    def _get_next_segment_sn(self,vt_symbol:str):
        """ 得到合约的下一交易状态编号 """
        next_key:str = ""
        if vt_symbol in self.trade_status_map:
            instrment_dict = self.trade_status_map[vt_symbol] 
            current_key = instrment_dict['current']
            keys = [key for key in sorted(instrment_dict.keys()) if key not in ["current","next"]]
            if current_key == keys[-1]:
                next_key = keys[0]
            else:
                index = 0
                for key in keys:
                    index += 1
                    if key == current_key:
                        break
                next_key = keys[index]

        return next_key

    def get_current_status(self,vt_symbol:str):
        """ 得到合约的当前交易状态 """
        status:StatusData = None

        symbol,exchange = extract_vt_symbol(vt_symbol)
        instrument = left_alphas(symbol)
        vt_symbol1 = f"{instrument}.{exchange.value}"

        if vt_symbol1 in self.trade_status_map:
            status_dict = self.trade_status_map[vt_symbol1]
            key = status_dict['current']
            status = status_dict[key]

        return status

    def get_next_status(self,vt_symbol:str):
        """ 得到合约的下一交易状态 """
        status:StatusData = None

        symbol,exchange = extract_vt_symbol(vt_symbol)
        instrument = left_alphas(symbol)
        vt_symbol1 = f"{instrument}.{exchange.value}"
        if vt_symbol1 in self.trade_status_map:
            status_dict = self.trade_status_map[vt_symbol1]
            key = status_dict['next']
            status = status_dict[key]

        return status

    def get_tick_status(self,tick:TickData):
        """  
        得到一个tick数据的合约所处交易状态
        """
        status:StatusData = None
        instrument = left_alphas(tick.symbol)
        tick_time = tick.datetime.strftime("%H:%M:%S")
        vt_symbol = f"{instrument}.{tick.exchange.value}"
        if vt_symbol in self.trade_status_map:
            status_dict = self.trade_status_map[vt_symbol]
            curr_key = status_dict["current"]
            next_key = status_dict["next"]
            curr_status:StatusData = status_dict[curr_key]
            next_status:StatusData = status_dict[next_key]
            if curr_status.enter_time < next_status.enter_time:
                if curr_status.enter_time <= tick_time < next_status.enter_time:
                    status = curr_status
                elif next_status.enter_time <= tick_time:
                    status = next_status
            else:
                if curr_status.enter_time <= tick_time:
                    status = curr_status
                elif next_status.enter_time <= tick_time:
                    status = next_status
        return status

2. 交易状态相关类型的编码器

因为TradeStatusManager的trade_status_map字典是一个复杂字典,对其进行json转换需要自定义一个编码器。实现如下:

class TradeStatusEncoder(json.JSONEncoder):
    """
    交易状态相关类型的编码器
    """
    def default(self, obj):
        d = {}
        d['__class__'] = obj.__class__.__name__
        if isinstance(obj,Exchange):
            d['_value_'] = obj.value
        elif isinstance(obj,InstrumentStatus):
            d['_value_'] = obj.value
        elif isinstance(obj,StatusEnterReason):
            d['_value_'] = obj.value
        elif isinstance(obj,StatusData):
            d.update(obj.__dict__)
        else:
            d['__module__'] = obj.__module__
            d.update(obj.__dict__)            
        return d

3. 交易状态相关类型的译码器

与2的原因相同,对把json转换到TradeStatusManager的trade_status_map,需要自定义一个译码器。实现如下:

class TradeStatusDecoder(json.JSONDecoder):
    """
    交易状态相关类型的译码器
    """
    def __init__(self):
        json.JSONDecoder.__init__(self, object_hook=self.dict2obj)

    def dict2obj(self, d):
        if '__class__' in d:               
            class_name = d.pop('__class__')
            if class_name == 'Exchange':
                value = d['_value_']
                instance = Exchange(value)
            elif class_name == 'InstrumentStatus':
                value = d['_value_']
                instance = InstrumentStatus(value)
            elif class_name == 'StatusEnterReason':
                value = d['_value_']
                instance = StatusEnterReason(value)
            elif class_name == 'StatusData':
                vt_symbol = d.pop('vt_symbol')
                symbol,exchange = extract_vt_symbol(vt_symbol)
                args = dict((key,value) for key, value in d.items())
                args.update({"symbol":symbol,"exchange":exchange})
                instance = StatusData(**args)
            else:           
                module_name = d.pop('__module__')
                # print(f"!!!! class_name:{class_name} module_name:{module_name}")
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                args = dict((key,value) for key, value in d.items())
                instance = class_(**args)
        else:
            instance = d
        return instance

4. trade_status.json文件

4.1 trade_status.json保存在哪里?

这个文件通常保存在系统的用户目录下的.vntrader目录中。

4.2 trade_status.json的内容

4.2.1 有哪些内容?

这里仅仅展示一部分内容,如果您足够敏感的化,您应该能够立即领悟到这些信息的作用!
它是以合约品种为主键值,以进入时间为次键值的嵌套的字典,字典值为交易状态。
有了它你可以知道某个品种、合约在某个时刻之后处于什么状态。

这些状态包含:

  • NO_TRADING = "非交易"
  • BEFORE_TRADING = "开盘前"
  • AUCTION_ORDERING = "集合竞价报单"
  • AUCTION_MATCH = "集合竞价撮合"
  • AUCTION_BALANCE = "集合竞价价格平衡"
  • CONTINOUS = "连续交易"
  • CLOSE = "收盘"

4.2.2 这些信息有什么用?

有了这个状态信息,你可以:

  1. 过滤非交易接收到的无效数据
  2. 可以知道收到的tick是开盘tick,其包含的last_price就是开盘价
  3. 可以避免在在收到开盘tick时立即执行
  4. 因为这些信息每个交易日似乎重复播放的,你可以利用这些历史接收的交易状态信息,快速作出决策。

4.2.3 内容展示

{
    "TA.CZCE": {
        "19:44:24": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 4,
            "enter_time": "19:44:24",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "20:04:29": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 4,
            "enter_time": "20:04:29",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "20:55:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "集合竞价报单"
            },
            "trading_segment_sn": 7,
            "enter_time": "20:55:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "20:59:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "集合竞价撮合"
            },
            "trading_segment_sn": 9,
            "enter_time": "20:59:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "21:00:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "连续交易"
            },
            "trading_segment_sn": 13,
            "enter_time": "21:00:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "23:00:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "TA",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 15,
            "enter_time": "23:00:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "TA",
            "vt_symbol": "TA.CZCE"
        },
        "current": "23:00:00"
    },
    "OI.CZCE": {
        "19:44:24": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 4,
            "enter_time": "19:44:24",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "20:04:29": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 4,
            "enter_time": "20:04:29",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "20:55:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "集合竞价报单"
            },
            "trading_segment_sn": 7,
            "enter_time": "20:55:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "20:59:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "集合竞价撮合"
            },
            "trading_segment_sn": 9,
            "enter_time": "20:59:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "21:00:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "连续交易"
            },
            "trading_segment_sn": 13,
            "enter_time": "21:00:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "23:00:00": {
            "__class__": "StatusData",
            "gateway_name": "CTP",
            "symbol": "OI",
            "exchange": {
                "__class__": "Exchange",
                "_value_": "CZCE"
            },
            "settlement_group_id": "00000001",
            "instrument_status": {
                "__class__": "InstrumentStatus",
                "_value_": "非交易"
            },
            "trading_segment_sn": 15,
            "enter_time": "23:00:00",
            "enter_reason": {
                "__class__": "StatusEnterReason",
                "_value_": "自动切换"
            },
            "exchange_inst_id": "OI",
            "vt_symbol": "OI.CZCE"
        },
        "current": "23:00:00"
    },

    ... ... 内容太多,略去

}

5. 交易状态信息管理器的使用

未完待续 ... ...

说明:本帖只讨论了如何解决CTP接口中,集合竞价后立即下单被拒的问题
对集合竞价不太了解的童鞋们可以参阅 期货集合竞价时间期货集合竞价规则

1. 集合竞价之后开盘价往往是策略频繁出现信号的时候

做量化交易的都知道,在各个交易日的前都有集合竞价时段,集合竞价产生该交易日的开盘价。这个开盘价格是自上一个交易日结束至今所有市场因素对交易对象价格影响的集中体现,这些因素既包括基本面、技术面和消息面等因素!
行情也往往出现这些时候,尤其在经过一个交叉的节假日或者突发利空或者利多消息出现的时候更是如此!

2. 在收到集合竞价之后的开盘价下单会被拒

行情出现了,当然策略就会发出信号,进而下单交易。
可是当策略收到收到开盘价的时候,市场还处在集合竞价阶段,此时下单将会被拒(相信大家都遇到过),而这个问题目前vnpy并没有解决。
如果是手工操盘,当然没有问题,可是我们的通过使用策略或者算法来交易的,怎么可以不解决这个问题 ?!!!

2.1 解决因为开盘价下单会被拒问题的思路

此时下单会被拒主要是因为市场还处在集合竞价状态,我们应该选择在连续竞价状态时下单即可。
就这么简单,可是实现起来有点儿复杂。
你可以把下单的逻辑简单地规定为21:00~23:00,9:00~11:30和13:30~15:00这几个时段才可以下单,其他时段禁止下单。
这倒是也可以解决一部分问题,但这样总会顾头不顾尾,很多品种你是无法处理的!

3. 新版的CTP接口协议中增加了合约交易状态通知

3.1 合约交易状态通知推送接口函数

下面是合约交易状态通知推送接口函数和合约交易状态通知数据结构。
合约交易状态通知,主动推送。私有流回报。

◇ 1.函数原型virtual void OnRtnInstrumentStatus(CThostFtdcInstrumentStatusField *pInstrumentStatus) {};

回到顶部

◇ 2.参数pInstrumentStatus:合约状态

struct CThostFtdcInstrumentStatusField
{
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///合约在交易所的代码
    TThostFtdcExchangeInstIDType ExchangeInstID;
    ///结算组代码
    TThostFtdcSettlementGroupIDType SettlementGroupID;
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///合约交易状态
    TThostFtdcInstrumentStatusType InstrumentStatus;
    ///交易阶段编号
    TThostFtdcTradingSegmentSNType TradingSegmentSN;
    ///进入本状态时间
    TThostFtdcTimeType EnterTime;
    ///进入本状态原因
    TThostFtdcInstStatusEnterReasonType EnterReason;
};

◇ 3.返回无

3.2 下面合约当前所处的交易状态C语言类型和枚举值

/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'

typedef char TThostFtdcInstrumentStatusType;

4. 解决CTP接口中集合竞价后立即下单被拒问题的实现步骤

4.1 定义相关的常量和数据类

在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    合约品种状态信息 hxxjava debug
    """
    # 合约代码
    symbol:str       
    # 交易所代码                       
    exchange : Exchange    
    # 结算组代码                     
    settlement_group_id : str = ""  
    # 合约交易状态             
    instrument_status : InstrumentStatus = None   
    # 交易阶段编号
    trading_segment_sn : int = None 
    # 进入本状态时间
    enter_time : str = ""      
    # 进入本状态原因  
    enter_reason : str = ""  
    # 合约在交易所的代码       
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        """ 判断状态是否属于一个合约 """
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug

4.2 为gateway添加on_status()函数

在vnpy\trader\gateway.py中修改网关父类gateway,为其增加on_status()函数:

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)

4.3 为CTP接口中的TdApi接口

在vnpy_ctp\gateway\ctp_gateway.py中为CtpTdApi增加交易状态通知推送接口函数onRtnInstrumentStatus():

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

4.3.1 别忘了其中用到的两个映射字典:

映射字典INSTRUMENTSTATUS_CTP2VT:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}

映射字典ENTERREASON_CTP2VT:

# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

4.4 将交易状态通知保存在OmsEngine中

修改vnpy\trader\engine.py中的OmsEngine,修改如下:

class OmsEngine(BaseEngine):
    """
    Provides order management system function for VN Trader.
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.statuses: Dict[str, StatusData] = {}       # hxxjava debug
        self.active_orders: Dict[str, OrderData] = {}

        self.add_function()
        self.register_event()

    def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_active_orders = self.get_all_active_orders
        self.main_engine.get_all_statuses = self.get_all_statuses       # hxxjava debug
        self.main_engine.get_status = self.get_status                   # hxxjava debug

    def register_event(self) -> None:
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  
        self.event_engine.register(EVENT_STATUS, self.process_status_event)  # hxxjava debug

    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data
        self.ticks[tick.vt_symbol] = tick

    def process_order_event(self, event: Event) -> None:
        """"""
        order = event.data
        self.orders[order.vt_orderid] = order

        # If order is active, then update data in dict.
        if order.is_active():
            self.active_orders[order.vt_orderid] = order
        # Otherwise, pop inactive order from in dict
        elif order.vt_orderid in self.active_orders:
            self.active_orders.pop(order.vt_orderid)

    def process_trade_event(self, event: Event) -> None:
        """"""
        trade = event.data
        self.trades[trade.vt_tradeid] = trade

    def process_position_event(self, event: Event) -> None:
        """"""
        position = event.data
        self.positions[position.vt_positionid] = position

    def process_account_event(self, event: Event) -> None:
        """"""
        account = event.data
        self.accounts[account.vt_accountid] = account

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        self.contracts[contract.vt_symbol] = contract

    def process_status_event(self, event: Event) -> None:   # hxxjava debug
        """"""
        status = event.data
        # print(f"got a status = {status}")
        self.statuses[status.vt_symbol] = status

    def get_tick(self, vt_symbol: str) -> Optional[TickData]:
        """
        Get latest market tick data by vt_symbol.
        """
        return self.ticks.get(vt_symbol, None)

    def get_order(self, vt_orderid: str) -> Optional[OrderData]:
        """
        Get latest order data by vt_orderid.
        """
        return self.orders.get(vt_orderid, None)

    def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
        """
        Get trade data by vt_tradeid.
        """
        return self.trades.get(vt_tradeid, None)

    def get_position(self, vt_positionid: str) -> Optional[PositionData]:
        """
        Get latest position data by vt_positionid.
        """
        return self.positions.get(vt_positionid, None)

    def get_account(self, vt_accountid: str) -> Optional[AccountData]:
        """
        Get latest account data by vt_accountid.
        """
        return self.accounts.get(vt_accountid, None)

    def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
        """
        Get contract data by vt_symbol.
        """
        return self.contracts.get(vt_symbol, None)

    def get_all_ticks(self) -> List[TickData]:
        """
        Get all tick data.
        """
        return list(self.ticks.values())

    def get_all_orders(self) -> List[OrderData]:
        """
        Get all order data.
        """
        return list(self.orders.values())

    def get_all_trades(self) -> List[TradeData]:
        """
        Get all trade data.
        """
        return list(self.trades.values())

    def get_all_positions(self) -> List[PositionData]:
        """
        Get all position data.
        """
        return list(self.positions.values())

    def get_all_accounts(self) -> List[AccountData]:
        """
        Get all account data.
        """
        return list(self.accounts.values())

    def get_all_contracts(self) -> List[ContractData]:
        """
        Get all contract data.
        """
        return list(self.contracts.values())

    def get_all_statuses(self) -> List[StatusData]:     # hxxjava debug
        """
        Get all status data.
        """
        return list(self.statuses.values())

    def get_status(self,vt_symbol:str) -> List[StatusData]:     # hxxjava debug
        """
        Get the vt_symbol's status data.
        """
        symbol,exchange_str = vt_symbol.split('.')
        instrment = left_alphas(symbol)
        instrment_vt_symbol = f"{instrment}.{exchange_str}"
        return self.statuses.get(instrment_vt_symbol,None)

    def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
        """
        Get all active orders by vt_symbol.

        If vt_symbol is empty, return all active orders.
        """
        if not vt_symbol:
            return list(self.active_orders.values())
        else:
            active_orders = [
                order
                for order in self.active_orders.values()
                if order.vt_symbol == vt_symbol
            ]
            return active_orders

5. 如何使用交易状态通知相关函数?

5.1 修改CTA交易策略

5.1.1 修改交易模板CtaTemplate

修改vnpy\app\cta_strategy\template.py,增加得到交易合约当前交易状态的函数get_trade_status():

    def get_trade_status(self):
        """
        得到交易合约当前交易状态
        """
        main_engine = self.cta_engine.main_engine
        return main_engine.get_status(self.vt_symbol)

5.1.2 修改您的CTA策略下单控制

在你的策略中调用buy()、sell()、short()、cover()或者send_order()下单函数之前,使用类似这样的判断:
下面使用某个CTA策略的on_bar()函数的来说明对交易状态的使用。

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        atr_array = am.atr(self.atr_length, array=True)
        self.atr_value = atr_array[-1]
        self.atr_ma = atr_array[-self.atr_ma_length:].mean()
        self.rsi_value = am.rsi(self.rsi_length)

        if self.get_trade_status() != InstrumentStatus.CONTINOUS:      
             # 注意:这里,如果当前合约不在连续竞价状态,放弃下面的信号计算和下单操作!!!
             self.put_event()
             return

        if self.pos == 0:
            self.intra_trade_high = bar.high_price
            self.intra_trade_low = bar.low_price

            if self.atr_value > self.atr_ma:
                if self.rsi_value > self.rsi_buy:
                    self.buy(bar.close_price + 5, self.fixed_size)
                elif self.rsi_value < self.rsi_sell:
                    self.short(bar.close_price - 5, self.fixed_size)

        elif self.pos > 0:
            self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
            self.intra_trade_low = bar.low_price

            long_stop = self.intra_trade_high * \
                (1 - self.trailing_percent / 100)
            self.sell(long_stop, abs(self.pos), stop=True)

        elif self.pos < 0:
            self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
            self.intra_trade_high = bar.high_price

            short_stop = self.intra_trade_low * \
                (1 + self.trailing_percent / 100)
            self.cover(short_stop, abs(self.pos), stop=True)

        self.put_event()

5.2 修改价差交易相关模板及价差交易策略

5.2.1 修改SpreadAlgoTemplate

修改vnpy\app\spread_trading\template.py中的SpreadAlgoTemplate模板,增加get_legs_status()读取价差算法的各个腿当前交易状态:

    def get_legs_status(self):
        """
        得到当前策略所交易的价差交易状态    # hxxjava add
        """
        legs_status:Dict[str:InstrumentStatus] = {}
        main_engine = self.algo_engine.main_engine
        for leg in self.spread.legs.values():
            astatus:StatusData = main_engine.get_status(leg.vt_symbol)
            legs_status[leg.vt_symbol] = astatus.instrument_status
        return legs_status

5.2.2 修改SpreadStrategyTemplate

修改vnpy\app\spread_trading\template.py中的SpreadStrategyTemplate模板,增加get_legs_status()读取价差交易策略的各个腿当前交易状态:

    def get_legs_status(self):
        """
        得到当前策略所交易的价差交易状态    # hxxjava add
        """
        legs_status:Dict[str:InstrumentStatus] = {}
        main_engine = self.strategy_engine.main_engine
        for leg in self.spread.legs.values():
            astatus:StatusData = main_engine.get_status(leg.vt_symbol)
            legs_status[leg.vt_symbol] = astatus.instrument_status
        return legs_status

5.2.3 修改您的价差交易策略下单控制

在你的价差交易策略start_long_algo()和start_short_algo()下单函数之前,使用类似这样的判断:
下面使用某个价差交易策略的on_spread_data()函数的来说明对交易状态的使用,交易指令部分无需纠结从哪里来,只是举例。

    def on_spread_data(self):
        """
        Callback when spread price is updated.
        """
        tick = self.get_spread_tick()

        self.bg.update_tick(tick)

        # 得到价差交易策略的各个腿的交易状态
        legs_status = list(self.get_legs_status().values())

        # 这里要求价差交易策略的每个腿的交易状态均为连续竞价状态
        is_continous_status = legs_status.count(InstrumentStatus.CONTINOUS) == len(self.spread.legs)

        if self.trading and is_continous_status:
            # 这里是交易指令,无需纠结代码从哪里来
            pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
            self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))

5.3 其他模块如何修改?

如果您感兴趣,其他的模块,如算法交易模块algo、投资组合模块PortfolioStrategy等,在对交易状态信息的使用,可以参考上述的CTA策略米筐和价差交易模块的做法,应该不难。
也许您也和我一样饱受这个问题的困扰,希望本人的辛苦付出能够帮到你!

记住:分享是一种美德

1. 本地价数据差合成的过程

首先说明,这里所合成出来的价差K线只是简易价差K线,它和交易所发表的套利品种的K线相比还是有所差别的,主要的差别在最高价、最低价和开盘价,但收盘价是准确的。
description

description

2. 实现过程:

2.1 在app\spread_trading\base.py添加下面query_tick_from_rq()函数

# hxxjava debug spread_trading
def query_tick_from_rq(
    symbol: str, exchange: Exchange, start: datetime, end: datetime
):
    """
    Query tick data from RQData.
    """
    from vnpy.trader.rqdata import rqdata_client
    from vnpy.trader.object import HistoryRequest

    if not rqdata_client.inited:
        rqdata_client.init()

    req = HistoryRequest(
        symbol=symbol,
        exchange=exchange,
        interval=Interval.TICK,
        start=start,
        end=end
    )
    data = rqdata_client.query_tick_history(req)
    return data

2.2 在app\spread_trading\base.py修改load_tick_data()函数

有两种方式:一种是从米筐加载数据下载tick,另一种是从数据库中读取已经录制的该价差的tick数据。

@lru_cache(maxsize=999)
def load_tick_data(
    spread: SpreadData,
    start: datetime,
    end: datetime,
    pricetick: float = 0
):
    """"""
    # hxxjava debug spread_trading
    # 目前没有考虑反向合约的情况,以后解决
    spread_ticks: List[TickData] = []

    try:
        # 防止因为用户没有米筐tick数据权限而发生异常

        # Load tick data of each spread leg
        dt_legs: Dict[str, Dict] = {}   # datetime string : Dict[vt_symbol:tick]
        format_str = "%Y%m%d%H%M%S.%f"
        for vt_symbol in spread.legs.keys():
            symbol, exchange = extract_vt_symbol(vt_symbol)

            # hxxjava debug spread_trading
            tick_data = query_tick_from_rq(symbol=symbol, exchange=exchange,start=start,end=end)

            if tick_data:
                print(f"load from rqdatac {symbol}.{exchange} tick_data, len of = {len(tick_data)}")

            # save all the spread's legs tick into a dictionary by tick's datetime
            for tick in tick_data:
                dt_str = tick.datetime.strftime(format_str)
                if dt_str in dt_legs:
                    dt_legs[dt_str].update({vt_symbol:tick})
                else:
                    dt_legs[dt_str] = {vt_symbol:tick}

        # Calculate spread bar data
        # snapshot of all legs's ticks  
        snapshot:Dict[str,TickData] = {}
        spread_leg_count = len(spread.legs)

        for dt_str in sorted(dt_legs.keys()): 
            dt = datetime.strptime(dt_str,format_str).astimezone(LOCAL_TZ)
            # get each datetime  
            spread_price = 0
            spread_value = 0

            # get all legs's ticks dictionary at the datetime
            leg_ticks = dt_legs.get(dt_str)
            for vt_symbol,tick in leg_ticks.items():
                # save each tick into the snapshot
                snapshot.update({vt_symbol:tick})

            if len(snapshot) < spread_leg_count:
                # if not all legs tick saved in the snapshot
                continue

            # out_str = f"{dt_str} "
            # format_str1 = "%Y-%m-%d %H:%M:%S.%f "
            for vt_symbol,tick in snapshot.items():
                price_multiplier = spread.price_multipliers[vt_symbol]
                spread_price += price_multiplier * tick.last_price
                spread_value += abs(price_multiplier) * tick.last_price
                # out_str += f"[{vt_symbol} {tick.datetime.strftime(format_str1)} {tick.last_price}],"
            # print(out_str)

            if pricetick:
                spread_price = round_to(spread_price, pricetick)

            spread_tick = TickData(                
                symbol=spread.name,
                exchange=exchange.LOCAL,
                datetime=dt,
                open_price=spread_price,
                high_price=spread_price,
                low_price=spread_price,
                last_price=spread_price,
                gateway_name="SPREAD")

            spread_tick.value = spread_value
            spread_ticks.append(spread_tick)

        if spread_ticks:
            print(f"load {symbol}.{exchange}' ticks from rqdatac, len of = {len(tick_data)}")

    finally:
        if not spread_ticks:
            # 读取数据库中已经录制过的该价差的tick数据
            spread_ticks = database_manager.load_tick_data(spread.name, Exchange.LOCAL, start, end)

        return spread_ticks

3. 如何使用load_tick_data()?

3.1 修改价差策略的on_init()

只要在你的价差策略中on_init()中添加如下代码,就可以调用:

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_tick(days=3)

3.2 如何剔除节假日?

修改vnpy\app\spread_trading\engine.py

3.2.1 利用米筐接口函数剔除节假日

增加下面函数:

def get_previous_trading_date(dt:datetime,days:int): # hxxjava add
    """ 
    得到某个日期dt的去除了节假日的前days个交易日 
    """
    from vnpy.trader.rqdata import rqdata_client
    import rqdatac as rq

    if not rqdata_client.inited:
        rqdata_client.init()

    prev_td = rq.get_previous_trading_date(date=dt.date(),n=days)

    return prev_td

3.2.2 修改SpreadStrategyEngine的load_bar()和load_tick()

修改如下,修改后两个函数的days参数就代表交易日了。

    def load_bar(
        self, spread: SpreadData, days: int, interval: Interval, callback: Callable
    ):
        """"""
        end = datetime.now()
        # start = end - timedelta(days)
        start = get_previous_trading_date(dt = end,days=days)   # hxxjava change

        bars = load_bar_data(spread, interval, start, end)

        print(f"{spread.name} {start}-{end} len of bars = {len(bars)}") # hxxjava debug spead_trading
        for bar in bars:
            callback(bar)

    def load_tick(self, spread: SpreadData, days: int, callback: Callable):
        """"""
        end = datetime.now()
        # start = end - timedelta(days=days)
        start = get_previous_trading_date(dt = end,days=days) # hxxjava change

        ticks = load_tick_data(spread, start, end)

        for tick in ticks:
            callback(tick)

1. 价差策略加载历史数据加载方式有两种

价差策略目标中SpreadStrategyTemplate中有load_bar()和load_tick()两个历史数据加载方法,代码如下:

class SpreadStrategyTemplate:

    ... ... 其他略去

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
    ):
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback = self.on_spread_bar

        self.strategy_engine.load_bar(self.spread, days, interval, callback)

    def load_tick(self, days: int):
        """
        Load historical tick data for initializing strategy.
        """
        self.strategy_engine.load_tick(self.spread, days, self.on_spread_tick)

2. 因此我们的价差策略可以有两种加载历史数据的方法

2.1 利用各腿的历史1分钟K线数据合成价差1分钟K线

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

         # 加载各腿的历史1分钟K线数据来合成价差1分钟K线
        self.load_bar(days=10)

2.2 利用各腿的历史tick数据合成价差1分钟K线

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        # 各腿的历史tick数据合成价差1分钟K线,当然比load_bar()慢多了
        self.load_tick(days=10)

3. 两种方法应该得到的价差1分钟K线有什么区别?

  • 使用self.load_bar(days=10) 得到的价差1分钟K线,除了close_price和volume之外是正确的,open_price、high_price和low_price基本上没有什么意义,如果说它们是错误的也不为过。没有办法,如果你只有各腿的1分钟历史K线数据,也只能如此了。
  • 使用self.load_tick(days=10)应该可以得到的较为精确的价差1分钟K线(S),除了open_price,close_price和volume之外,high_price和low_price基本比较接近交易所中的值,不过与交易所推送的套利价差1分钟K线(Sj)的high_price和low_price相比,它们的关系永远是S.high_price<=Sj.high_price同时S.low_price>=Sj.low_price。
  • 就是说,无论利用各腿的历史1分钟K线数据还是历史tick数据来合成价差1分钟K线数据,你都无法得到准确的high_price和low_price,因为tick数据从本质上看只是一个周期更小的K线而已,它同意也是除了last_price和volume之外是正确的,其他的open_price、high_price和low_price也是无法用于计算价差K线的各种价格的。
  • 什么说open_price也是无法用于计算价差K线的各种价格的开盘价,因为两个tick的有开盘价的时间是不同时的,较后的一个开盘价找没有对应的同时价格,所以open_price也是无法用于计算价差K线的各种价格的开盘价。

在客户端是永远无法合成出与交易所中一样的价差数据的,具体原因我会抽时间在另外的帖子里专门讨论此问题。

4. self.load_tick(days=10)加载不到任何历史1分钟K线数据

实际运行self.load_tick(days=10),加载不到任何历史1分钟K线数据。

4.1 原来self.load_tick(days=10)调用的是SpreadDataEngine的load_tick()

代码如下:

    def load_tick(self, spread: SpreadData, days: int, callback: Callable):
        """"""
        end = datetime.now()
        start = end - timedelta(days)

        ticks = load_tick_data(spread, start, end)

        for tick in ticks:
            callback(tick)

而这里用到的load_tick_data()的代码是这样的:

@lru_cache(maxsize=999)
def load_tick_data(
    spread: SpreadData,
    start: datetime,
    end: datetime,
):
    """"""
   return database_manager.load_tick_data(
            spread.name, Exchange.LOCAL, start, end
        )

那意思是直接从本地数据库中读取价差的历史数据。注意是直接读取!

4.2 怪怪的价差策略模板函数load_tick()

我们知道价差策略中使用的价差都是自定义的本地价差,使用vnpy中的DataRecorder是无法录制本地价差组合的数据的。
但是我们可以使用vnpy中的DataRecorder是录制交易所中套利合约的价差数据,它和其他的普通合约没有是么差别,它的历史tick是可以使用load_tick_data()来读取加载的。
可是价差策略无法交易套利合约。

这是什么样的设计!!!

1. 价差数量类SpreadData的目前代码:

class SpreadData:
    """"""

    def __init__(
        self,
        name: str,
        legs: List[LegData],
        price_multipliers: Dict[str, int],
        trading_multipliers: Dict[str, int],
        active_symbol: str,
        inverse_contracts: Dict[str, bool],
        min_volume: float
    ):
        """"""
        self.name: str = name

        self.legs: Dict[str, LegData] = {}
        self.active_leg: LegData = None
        self.passive_legs: List[LegData] = []

        self.min_volume: float = min_volume
        self.pricetick: float = 0

        # For calculating spread price
        self.price_multipliers: Dict[str, int] = price_multipliers

        # For calculating spread pos and sending orders
        self.trading_multipliers: Dict[str, int] = trading_multipliers

        self.inverse_contracts: Dict[str, bool] = inverse_contracts

        self.price_formula: str = ""
        self.trading_formula: str = ""

        for leg in legs:
            self.legs[leg.vt_symbol] = leg
            if leg.vt_symbol == active_symbol:
                self.active_leg = leg
            else:
                self.passive_legs.append(leg)

            price_multiplier = self.price_multipliers[leg.vt_symbol]
            if price_multiplier > 0:
                self.price_formula += f"+{price_multiplier}*{leg.vt_symbol}"
            else:
                self.price_formula += f"{price_multiplier}*{leg.vt_symbol}"

            trading_multiplier = self.trading_multipliers[leg.vt_symbol]
            if trading_multiplier > 0:
                self.trading_formula += f"+{trading_multiplier}*{leg.vt_symbol}"
            else:
                self.trading_formula += f"{trading_multiplier}*{leg.vt_symbol}"

            if not self.pricetick:
                self.pricetick = leg.pricetick
            else:
                self.pricetick = min(self.pricetick, leg.pricetick)

        # Spread data
        self.bid_price: float = 0
        self.ask_price: float = 0
        self.bid_volume: float = 0
        self.ask_volume: float = 0

        self.net_pos: float = 0
        self.datetime: datetime = None

    def calculate_price(self):
        """"""
        self.clear_price()

        # Go through all legs to calculate price
        for n, leg in enumerate(self.legs.values()):
            # Filter not all leg price data has been received
            if not leg.bid_volume or not leg.ask_volume:
                self.clear_price()
                return

            # Calculate price
            price_multiplier = self.price_multipliers[leg.vt_symbol]
            if price_multiplier > 0:
                self.bid_price += leg.bid_price * price_multiplier
                self.ask_price += leg.ask_price * price_multiplier
            else:
                self.bid_price += leg.ask_price * price_multiplier
                self.ask_price += leg.bid_price * price_multiplier

            # Round price to pricetick
            if self.pricetick:
                self.bid_price = round_to(self.bid_price, self.pricetick)
                self.ask_price = round_to(self.ask_price, self.pricetick)

            # Calculate volume
            trading_multiplier = self.trading_multipliers[leg.vt_symbol]
            inverse_contract = self.inverse_contracts[leg.vt_symbol]

            if not inverse_contract:
                leg_bid_volume = leg.bid_volume
                leg_ask_volume = leg.ask_volume
            else:
                leg_bid_volume = calculate_inverse_volume(
                    leg.bid_volume, leg.bid_price, leg.size)
                leg_ask_volume = calculate_inverse_volume(
                    leg.ask_volume, leg.ask_price, leg.size)

            if trading_multiplier > 0:
                adjusted_bid_volume = floor_to(
                    leg_bid_volume / trading_multiplier,
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_ask_volume / trading_multiplier,
                    self.min_volume
                )
            else:
                adjusted_bid_volume = floor_to(
                    leg_ask_volume / abs(trading_multiplier),
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_bid_volume / abs(trading_multiplier),
                    self.min_volume
                )

            # For the first leg, just initialize
            if not n:
                self.bid_volume = adjusted_bid_volume
                self.ask_volume = adjusted_ask_volume
            # For following legs, use min value of each leg quoting volume
            else:
                self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
                self.ask_volume = min(self.ask_volume, adjusted_ask_volume)

            # Update calculate time
            self.datetime = datetime.now(LOCAL_TZ)

    def calculate_pos(self):
        """"""
        long_pos = 0
        short_pos = 0

        for n, leg in enumerate(self.legs.values()):
            leg_long_pos = 0
            leg_short_pos = 0

            trading_multiplier = self.trading_multipliers[leg.vt_symbol]
            if not trading_multiplier:
                continue

            inverse_contract = self.inverse_contracts[leg.vt_symbol]
            if not inverse_contract:
                net_pos = leg.net_pos
            else:
                net_pos = calculate_inverse_volume(
                    leg.net_pos, leg.net_pos_price, leg.size)

            adjusted_net_pos = net_pos / trading_multiplier

            if adjusted_net_pos > 0:
                adjusted_net_pos = floor_to(adjusted_net_pos, self.min_volume)
                leg_long_pos = adjusted_net_pos
            else:
                adjusted_net_pos = ceil_to(adjusted_net_pos, self.min_volume)
                leg_short_pos = abs(adjusted_net_pos)

            if not n:
                long_pos = leg_long_pos
                short_pos = leg_short_pos
            else:
                long_pos = min(long_pos, leg_long_pos)
                short_pos = min(short_pos, leg_short_pos)

        if long_pos > 0:
            self.net_pos = long_pos
        else:
            self.net_pos = -short_pos

    def clear_price(self):
        """"""
        self.bid_price = 0
        self.ask_price = 0
        self.bid_volume = 0
        self.ask_volume = 0

    def calculate_leg_volume(self, vt_symbol: str, spread_volume: float) -> float:
        """"""
        leg = self.legs[vt_symbol]
        trading_multiplier = self.trading_multipliers[leg.vt_symbol]
        leg_volume = spread_volume * trading_multiplier
        return leg_volume

    def calculate_spread_volume(self, vt_symbol: str, leg_volume: float) -> float:
        """"""
        leg = self.legs[vt_symbol]
        trading_multiplier = self.trading_multipliers[leg.vt_symbol]
        spread_volume = leg_volume / trading_multiplier

        if spread_volume > 0:
            spread_volume = floor_to(spread_volume, self.min_volume)
        else:
            spread_volume = ceil_to(spread_volume, self.min_volume)

        return spread_volume

    def to_tick(self):
        """"""
        tick = TickData(
            symbol=self.name,
            exchange=Exchange.LOCAL,
            datetime=self.datetime,
            name=self.name,
            last_price=(self.bid_price + self.ask_price) / 2,
            bid_price_1=self.bid_price,
            ask_price_1=self.ask_price,
            bid_volume_1=self.bid_volume,
            ask_volume_1=self.ask_volume,
            gateway_name="SPREAD"
        )
        return tick

    def is_inverse(self, vt_symbol: str) -> bool:
        """"""
        inverse_contract = self.inverse_contracts[vt_symbol]
        return inverse_contract

    def get_leg_size(self, vt_symbol: str) -> float:
        """"""
        leg = self.legs[vt_symbol]
        return leg.size

2. 腿数据的LegData定义

class LegData:
    """"""

    def __init__(self, vt_symbol: str):
        """"""
        self.vt_symbol: str = vt_symbol

        # Price and position data
        self.bid_price: float = 0
        self.ask_price: float = 0
        self.bid_volume: float = 0
        self.ask_volume: float = 0

        self.long_pos: float = 0
        self.short_pos: float = 0
        self.net_pos: float = 0

        self.last_price: float = 0
        self.net_pos_price: float = 0       # Average entry price of net position

        # Tick data buf
        self.tick: TickData = None

        # Contract data
        self.size: float = 0
        self.net_position: bool = False
        self.min_volume: float = 0
        self.pricetick: float = 0

    def update_contract(self, contract: ContractData):
        """"""
        self.size = contract.size
        self.net_position = contract.net_position
        self.min_volume = contract.min_volume
        self.pricetick = contract.pricetick

    def update_tick(self, tick: TickData):
        """"""
        self.bid_price = tick.bid_price_1
        self.ask_price = tick.ask_price_1
        self.bid_volume = tick.bid_volume_1
        self.ask_volume = tick.ask_volume_1
        self.last_price = tick.last_price

        self.tick = tick

    def update_position(self, position: PositionData):
        """"""
        if position.direction == Direction.NET:
            self.net_pos = position.volume
            self.net_pos_price = position.price
        else:
            if position.direction == Direction.LONG:
                self.long_pos = position.volume
            else:
                self.short_pos = position.volume
            self.net_pos = self.long_pos - self.short_pos

    def update_trade(self, trade: TradeData):
        """"""
        # Only update net pos for contract with net position mode
        if self.net_position:
            trade_cost = trade.volume * trade.price
            old_cost = self.net_pos * self.net_pos_price

            if trade.direction == Direction.LONG:
                new_pos = self.net_pos + trade.volume

                if self.net_pos >= 0:
                    new_cost = old_cost + trade_cost
                    self.net_pos_price = new_cost / new_pos
                else:
                    # If all previous short position closed
                    if not new_pos:
                        self.net_pos_price = 0
                    # If only part short position closed
                    elif new_pos > 0:
                        self.net_pos_price = trade.price
            else:
                new_pos = self.net_pos - trade.volume

                if self.net_pos <= 0:
                    new_cost = old_cost - trade_cost
                    self.net_pos_price = new_cost / new_pos
                else:
                    # If all previous long position closed
                    if not new_pos:
                        self.net_pos_price = 0
                    # If only part long position closed
                    elif new_pos < 0:
                        self.net_pos_price = trade.price

            self.net_pos = new_pos
        else:
            if trade.direction == Direction.LONG:
                if trade.offset == Offset.OPEN:
                    self.long_pos += trade.volume
                else:
                    self.short_pos -= trade.volume
            else:
                if trade.offset == Offset.OPEN:
                    self.short_pos += trade.volume
                else:
                    self.long_pos -= trade.volume

            self.net_pos = self.long_pos - self.short_pos

3. SpreadData的to_tck()一定能够返回有效tick吗?

3.1 SpreadData的3个价格的计算

从上面的代码可以知道,SpreadData中包含若干个腿,它的tick数据应该是有各腿的tick合成的,可是我们看SpreadData的to_tck()的代码,看不是这样的!

    def to_tick(self):
        """"""
        tick = TickData(
            symbol=self.name,
            exchange=Exchange.LOCAL,
            datetime=self.datetime,
            name=self.name,
            last_price=(self.bid_price + self.ask_price) / 2,
            bid_price_1=self.bid_price,
            ask_price_1=self.ask_price,
            bid_volume_1=self.bid_volume,
            ask_volume_1=self.ask_volume,
            gateway_name="SPREAD"
        )
        return tick

举例吧:

假如价差(SpreadData)的实例S中包含两腿(LegData)L1和L2,L1、L2的价格乘数分别为1和-1,那么:
在任意时刻,当L1得到了最新tick1,L2得到最新tick2,
L1的
L1.last_price=tick1.last_price
L1.bid_price_1=tick1.bid_price_1
L1.ask_price_1=tick1.ask_price_1

L2的部分数据
L2.last_price=tick2.last_price
L2.bid_price_1=tick2.bid_price_1
L2.ask_price_1=tick2.ask_price_1

那么经过价差S的calculate_price()的计算后,
S.last_price=L1.last_price-L2.last_price
S.bid_price_1=L1.bid_price_1-L2.bid_price_1
S.ask_price_1=L1.ask_price_1-L2.ask_price_1

价差S的价格的有效性来自于腿L1和腿L2的价格的有效性!

问题来了:如果腿L1已经得到了有效数据,而腿L2还没有得到有效数据,那么价差S的价格将是无效的!

3.2 注意价差的calculate_price()函数中的判断条件

            if not leg.bid_volume or not leg.ask_volume:
                self.clear_price()
                return

这里的条件意思是说如果价差的某个腿中的数据是无意义的,那么就清空价差的所有价格,那么此时SpreadData的to_tick()得到的tick就不是一个有效的tick数据!

3.3 何时会发生这种情况?

只要价差的多个腿中有一个腿的数据没有使用实际的tick更新过,就会发生这种情况!

3.4 出现这种情况,只要价差的价格是多少?

全部是0,因为clear_price()的代码如下:

    def clear_price(self):
        """"""
        self.bid_price = 0
        self.ask_price = 0
        self.bid_volume = 0
        self.ask_volume = 0

4. 这样的数据应会被推送给价差和价差策略吗?

4.1 SpreadDataEngine是如何推送价差数据给价格和价差策略的?

我们知道价差的数据计算和更新是有SpreadDataEngine维护的,下面是SpreadDataEngine的process_tick_event():

    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data

        leg = self.legs.get(tick.vt_symbol, None)
        if not leg:
            return
        leg.update_tick(tick)

        for spread in self.symbol_spread_map[tick.vt_symbol]:
            spread.calculate_price()           # 这里并没有对价差的价格计算是否有效的判断
            self.put_data_event(spread)

    def put_data_event(self, spread: SpreadData) -> None:
        """"""
        event = Event(EVENT_SPREAD_DATA, spread)
        self.event_engine.put(event)

这里并没有对价差的价格计算是否有效的判断,就直接向价差发送了EVENT_SPREAD_DATA消息,这看引起价差和价差策略通过推送接口on_spread_data()得到错误的价差数据!!!

4.2 如何改正此错误?

4.2.1 修改价差SpreadData的calculate_price()函数,使其可以返回价差是否有效:

 def calculate_price(self)->bool:    # hxxjava change
        """"""
        self.clear_price()

        # Go through all legs to calculate price
        for n, leg in enumerate(self.legs.values()):
            # Filter not all leg price data has been received
            if not leg.bid_volume or not leg.ask_volume:
                self.clear_price()
                return False    # hxxjava add

            # Calculate price
            price_multiplier = self.price_multipliers[leg.vt_symbol]
            if price_multiplier > 0:
                self.bid_price += leg.bid_price * price_multiplier
                self.ask_price += leg.ask_price * price_multiplier
            else:
                self.bid_price += leg.ask_price * price_multiplier
                self.ask_price += leg.bid_price * price_multiplier

            # Round price to pricetick
            if self.pricetick:
                self.bid_price = round_to(self.bid_price, self.pricetick)
                self.ask_price = round_to(self.ask_price, self.pricetick)

            # Calculate volume
            trading_multiplier = self.trading_multipliers[leg.vt_symbol]
            inverse_contract = self.inverse_contracts[leg.vt_symbol]

            if not inverse_contract:
                leg_bid_volume = leg.bid_volume
                leg_ask_volume = leg.ask_volume
            else:
                leg_bid_volume = calculate_inverse_volume(
                    leg.bid_volume, leg.bid_price, leg.size)
                leg_ask_volume = calculate_inverse_volume(
                    leg.ask_volume, leg.ask_price, leg.size)

            if trading_multiplier > 0:
                adjusted_bid_volume = floor_to(
                    leg_bid_volume / trading_multiplier,
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_ask_volume / trading_multiplier,
                    self.min_volume
                )
            else:
                adjusted_bid_volume = floor_to(
                    leg_ask_volume / abs(trading_multiplier),
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_bid_volume / abs(trading_multiplier),
                    self.min_volume
                )

            # For the first leg, just initialize
            if not n:
                self.bid_volume = adjusted_bid_volume
                self.ask_volume = adjusted_ask_volume
            # For following legs, use min value of each leg quoting volume
            else:
                self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
                self.ask_volume = min(self.ask_volume, adjusted_ask_volume)

            # Update calculate time
            self.datetime = datetime.now(LOCAL_TZ)

      return True # hxxjava add

4.2.2 修改灵活价差AdvancedSpreadData的calculate_price()函数,使其可以返回价差是否有效:

class AdvancedSpreadData(SpreadData):
    def calculate_price(self)->bool: # hxxjava change
        """"""
        self.clear_price()

        # Go through all legs to calculate price
        bid_data = {}
        ask_data = {}
        volume_inited = False

        for variable, leg in self.variable_legs.items():
            # Filter not all leg price data has been received
            if not leg.bid_volume or not leg.ask_volume:
                self.clear_price()
                return False    # hxxjava change

            # Generate price dict for calculating spread bid/ask
            variable_direction = self.variable_directions[variable]
            if variable_direction > 0:
                bid_data[variable] = leg.bid_price
                ask_data[variable] = leg.ask_price
            else:
                bid_data[variable] = leg.ask_price
                ask_data[variable] = leg.bid_price

            # Calculate volume
            trading_multiplier = self.trading_multipliers[leg.vt_symbol]
            if not trading_multiplier:
                continue

            inverse_contract = self.inverse_contracts[leg.vt_symbol]
            if not inverse_contract:
                leg_bid_volume = leg.bid_volume
                leg_ask_volume = leg.ask_volume
            else:
                leg_bid_volume = calculate_inverse_volume(
                    leg.bid_volume, leg.bid_price, leg.size)
                leg_ask_volume = calculate_inverse_volume(
                    leg.ask_volume, leg.ask_price, leg.size)

            if trading_multiplier > 0:
                adjusted_bid_volume = floor_to(
                    leg_bid_volume / trading_multiplier,
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_ask_volume / trading_multiplier,
                    self.min_volume
                )
            else:
                adjusted_bid_volume = floor_to(
                    leg_ask_volume / abs(trading_multiplier),
                    self.min_volume
                )
                adjusted_ask_volume = floor_to(
                    leg_bid_volume / abs(trading_multiplier),
                    self.min_volume
                )

            # For the first leg, just initialize
            if not volume_inited:
                self.bid_volume = adjusted_bid_volume
                self.ask_volume = adjusted_ask_volume
                volume_inited = True
            # For following legs, use min value of each leg quoting volume
            else:
                self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
                self.ask_volume = min(self.ask_volume, adjusted_ask_volume)

        # Calculate spread price
        self.bid_price = self.parse_formula(self.price_code, bid_data)
        self.ask_price = self.parse_formula(self.price_code, ask_data)

        # Round price to pricetick
        if self.pricetick:
            self.bid_price = round_to(self.bid_price, self.pricetick)
            self.ask_price = round_to(self.ask_price, self.pricetick)

        # Update calculate time
        self.datetime = datetime.now(LOCAL_TZ)

        return True # hxxjava add

4.2.3 修改SpreadDataEngine的process_tick_event()函数:

    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data

        leg = self.legs.get(tick.vt_symbol, None)
        if not leg:
            return
        leg.update_tick(tick)

        for spread in self.symbol_spread_map[tick.vt_symbol]:
            if spread.calculate_price():    # hxxjava change
                self.put_data_event(spread)

5. 如果不做上述改动会发生什么问题?

如果不做上述改动会,可能会出现策略在开盘的时间,由于价差没有收齐所有腿的tick,
导致价差的 lastest_price等数据为0,可是仍然被推价差数据,进而产生用错误的价差tick。
错误的价差tick会引发错误的价差交易信号,并且以错误的价格进行价差的开仓和平仓!!!

本人在实际的价差策略交易中已经发生过上述的错误!

呼吁vnpy官方尽快修改上述问题!!!

1. 价差策略模块中自带的策略StatisticalArbitrageStrategy

这个价差策略的大致意思是:

  • 当价差的数据发生变化是,特到价差的tick,然后利用BarGenerator来合成出1分钟价差K线;
  • 得到1分钟价差K线后再更新一个ArrayManager self.am,self.am初始化成功后就可以计算BOLL通道的中轨、上轨和下轨;
  • 当1分钟K线的收盘价与BOLL通道的中轨、上轨和下轨的关系来进行价差的开仓和平仓交易。

2. 价差策略StatisticalArbitrageStrategy的代码及错误

说明:错误的地方我已知注释了。

from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
    SpreadStrategyTemplate,
    SpreadAlgoTemplate,
    SpreadData,
    OrderData,
    TradeData,
    TickData,
    BarData
)


class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
    """"""

    author = "用Python的交易员"

    boll_window = 20
    boll_dev = 2
    max_pos = 10
    payup = 10
    interval = 5

    spread_pos = 0.0
    boll_up = 0.0
    boll_down = 0.0
    boll_mid = 0.0

    parameters = [
        "boll_window",
        "boll_dev",
        "max_pos",
        "payup",
        "interval"
    ]
    variables = [
        "spread_pos",
        "boll_up",
        "boll_down",
        "boll_mid"
    ]

    def __init__(
        self,
        strategy_engine,
        strategy_name: str,
        spread: SpreadData,
        setting: dict
    ):
        """"""
        super().__init__(
            strategy_engine, strategy_name, spread, setting
        )

        self.bg = BarGenerator(self.on_spread_bar)
        self.am = ArrayManager()

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bar(10)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

        self.put_event()

    def on_spread_data(self):
        """
        Callback when spread price is updated.
        """
        tick = self.get_spread_tick()
        self.on_spread_tick(tick)

    def on_spread_tick(self, tick: TickData):
        """
        Callback when new spread tick data is generated.
        """
        self.bg.update_tick(tick)   # 这里有兼容性错误,BarGenerator处理不了最新价为0的tick

    def on_spread_bar(self, bar: BarData):
        """
        Callback when spread bar data is generated.
        """
        self.stop_all_algos()

        self.am.update_bar(bar)
        if not self.am.inited:
            return

        self.boll_mid = self.am.sma(self.boll_window)
        self.boll_up, self.boll_down = self.am.boll(
            self.boll_window, self.boll_dev)

        if not self.spread_pos:
            if bar.close_price >= self.boll_up:
                self.start_short_algo(
                    bar.close_price - 10,
                    self.max_pos,
                    payup=self.payup,
                    interval=self.interval
                )
            elif bar.close_price <= self.boll_down:
                self.start_long_algo(
                    bar.close_price + 10,
                    self.max_pos,
                    payup=self.payup,
                    interval=self.interval
                )
        elif self.spread_pos < 0:
            if bar.close_price <= self.boll_mid:
                self.start_long_algo(
                    bar.close_price + 10,
                    abs(self.spread_pos),
                    payup=self.payup,
                    interval=self.interval
                )
        else:
            if bar.close_price >= self.boll_mid:
                self.start_short_algo(
                    bar.close_price - 10,
                    abs(self.spread_pos),
                    payup=self.payup,
                    interval=self.interval
                )

        self.put_event()

    def on_spread_pos(self):
        """
        Callback when spread position is updated.
        """
        self.spread_pos = self.get_spread_pos()
        self.put_event()

    def on_spread_algo(self, algo: SpreadAlgoTemplate):
        """
        Callback when algo status is updated.
        """
        pass

    def on_order(self, order: OrderData):
        """
        Callback when order status is updated.
        """
        pass

    def on_trade(self, trade: TradeData):
        """
        Callback when new trade data is received.
        """
        pass

    def stop_open_algos(self):
        """"""
        if self.buy_algoid:
            self.stop_algo(self.buy_algoid)

        if self.short_algoid:
            self.stop_algo(self.short_algoid)

    def stop_close_algos(self):
        """"""
        if self.sell_algoid:    #  self.sell_algoid没有定义    
            self.stop_algo(self.sell_algoid)

        if self.cover_algoid:  #  self.cover_algoid没有定义
            self.stop_algo(self.cover_algoid)

3. 这里BarGenerator与价差存在兼容性错误:处理不了最新价为0的tick

下面是BarGenerator的update_tick()函数,错误的地方我已知注释了:

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:             # 这个过滤条件有点想当然了
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

4. 如何修改此错误?

4.1 修改BarGenerator的update_tick()函数

把我标识的错误的过滤条件改成下面的代码,把它注释掉:

        # if not tick.last_price:             # 这个过滤条件有点想当然了
        #    return

4.2 修改价差策略StatisticalArbitrageStrategy

修改之处见代码中的注释:

from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
    SpreadStrategyTemplate,
    SpreadAlgoTemplate,
    SpreadData,
    OrderData,
    TradeData,
    TickData,
    BarData
)


class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
    """"""

    author = "用Python的交易员"

    boll_window = 20
    boll_dev = 2
    max_pos = 10
    payup = 10
    interval = 5

    spread_pos = 0.0
    boll_up = 0.0
    boll_down = 0.0
    boll_mid = 0.0

    parameters = [
        "boll_window",
        "boll_dev",
        "max_pos",
        "payup",
        "interval"
    ]
    variables = [
        "spread_pos",
        "boll_up",
        "boll_down",
        "boll_mid"
    ]

    def __init__(
        self,
        strategy_engine,
        strategy_name: str,
        spread: SpreadData,
        setting: dict
    ):
        """"""
        super().__init__(
            strategy_engine, strategy_name, spread, setting
        )

        self.bg = BarGenerator(self.on_spread_bar)
        self.am = ArrayManager()

        self.buy_algoid:str = ""    # hxxjava add
        self.short_algoid:str = ""  # hxxjava add
        self.sell_algoid = ""       # hxxjava add
        self.cover_algoid = ""      # hxxjava add


    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bar(10)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

        self.put_event()

    def on_spread_data(self):
        """
        Callback when spread price is updated.
        """
        tick = self.get_spread_tick()
        self.on_spread_tick(tick)

    def on_spread_tick(self, tick: TickData):
        """
        Callback when new spread tick data is generated.
        """
        self.bg.update_tick(tick)

    def on_spread_bar(self, bar: BarData):
        """
        Callback when spread bar data is generated.
        """
        self.stop_all_algos()

        self.am.update_bar(bar)
        if not self.am.inited:
            return

        self.boll_mid = self.am.sma(self.boll_window)
        self.boll_up, self.boll_down = self.am.boll(
            self.boll_window, self.boll_dev)

        if not self.spread_pos:
            if bar.close_price >= self.boll_up:
                self.buy_algoid = self.start_short_algo( # hxxjava change
                    bar.close_price - 10,
                    self.max_pos,
                    payup=self.payup,
                    interval=self.interval
                )
            elif bar.close_price <= self.boll_down:
                self.short_algoid = self.start_long_algo( # hxxjava change
                    bar.close_price + 10,
                    self.max_pos,
                    payup=self.payup,
                    interval=self.interval
                )
        elif self.spread_pos < 0:
            if bar.close_price <= self.boll_mid:
                self.sell_algoid = self.start_long_algo( # hxxjava change
                    bar.close_price + 10,
                    abs(self.spread_pos),
                    payup=self.payup,
                    interval=self.interval
                )
        else:
            if bar.close_price >= self.boll_mid:
                self.cover_algoid = self.start_short_algo(  # hxxjava change
                    bar.close_price - 10,
                    abs(self.spread_pos),
                    payup=self.payup,
                    interval=self.interval
                )

        self.put_event()

    def on_spread_pos(self):
        """
        Callback when spread position is updated.
        """
        self.spread_pos = self.get_spread_pos()
        self.put_event()

    def on_spread_algo(self, algo: SpreadAlgoTemplate):
        """
        Callback when algo status is updated.
        """
        if not algo.is_active():    # hxxjava add
            if self.buy_algoid == algo.algoid:
                self.buy_algoid = ""
            elif self.sell_algoid == algo.algoid:
                self.sell_algoid = ""
            elif self.short_algoid == algo.algoid:
                self.short_algoid = ""
            else:
                self.cover_algoid = ""

    def on_order(self, order: OrderData):
        """
        Callback when order status is updated.
        """
        pass

    def on_trade(self, trade: TradeData):
        """
        Callback when new trade data is received.
        """
        pass

    def stop_open_algos(self):
        """"""
        if self.buy_algoid:
            self.stop_algo(self.buy_algoid)

        if self.short_algoid:
            self.stop_algo(self.short_algoid)

    def stop_close_algos(self):
        """"""
        if self.sell_algoid:
            self.stop_algo(self.sell_algoid)

        if self.cover_algoid:
            self.stop_algo(self.cover_algoid)

我在自己的价差策略on_order()中加了这样的代码

    def on_order(self, order: OrderData):
        """
        Callback when order status is updated.
        """
        print(f"{self.spread_name} {order}")

在自己的价差策略on_trade()中加了这样的代码

    def on_trade(self, trade: TradeData):
        """
        Callback when new trade data is received.
        """
        print(f"{self.spread_name} {trade}")

我的价差策略运行后,成功交易后,没有任何打印信息,为什么这样?

1. 不同的价差交易策略交易相同的价差会发生什么?

1.1 一般的理解:价差和单边合约一样是一个交易标的

CTA策略的交易标的是一个具体的单边合约。假如我们运行两个CTA策略A和B实例,它们交易的合约都是C。同时运行A和B,那么我们可以发现A和B可以独立地统计各自的持仓,也就是说它们的pos可能是不一样的,不会相互干扰。
而价差交易策略的交易标的是价差。假如我们运行两个价差交易策略SA和SB实例,它们交易的价差都是S1。同时运行SA和SB,SA和SB也应该可以独立地统计各自的持仓,也就是说它们的spread_pos也应该是不一样的,不应该相互干扰。
然而,我们可以发现SA对S1开仓成功后,SB并没有开仓过,可是我们发现SB的spread_pos已经变成和SA的spread_pos相同的数量!
有点迷糊,细想一下,是啊,谁让你交易了相同的价差标的呢?

1.2 不同价差交易策略交易不同名称价差(但是价差的各腿是相同的)

不行就改,咱们按照价差S1的设置再创建一个价差S2,但是给它取一个不同的名称,区别一下!
接下来吧价差交易策略SB的标的该出S2,再次运行价差交易策略SB。
奇怪的现象发生了:SB并没有开仓过,可是SB的spread_pos仍然变成和SA的spread_pos相同的数量!

2. 问题出在哪里?

查看一下委托单:

description

其中"来源"一栏中的内容为 “SpreadTrading_价差名称”,就是这里过于简单,导致委托单只关联了价差,而没有关联价差交易策略名称,
所以在价差交易的SpreadTradeEngine引擎无法按照价差策略来推送类似委托单order,成交单trade和价差持仓信息等。

价差交易策略一旦发出委托,调用了SpreadStrategyTemplate的start_long_algo()或者start_short_algo(),而这两个函数最终调用了SpreadStrategyEngine的start_algo()

    def start_algo(
        self,
        direction: Direction,
        price: float,
        volume: float,
        payup: int,
        interval: int,
        lock: bool,
        offset: Offset
    ) -> str:
        """"""
        if not self.trading:
            return ""

        algoid: str = self.strategy_engine.start_algo(
            self,
            self.spread_name,          # 这里只有价差名称,没有传递策略名称
            direction,
            offset,
            price,
            volume,
            payup,
            interval,
            lock
        )

        self.algoids.add(algoid)

        return algoid
  1. 从此不知道委托单到底是哪个价差交易策略发出来的了!
  2. 成交单虽然可以委托单号查询到是哪个委托单,但是1的原因,所以找不到是哪个价差交易策略的成交单了
  3. 于是成交单只能被被推送到价差

4. 应该怎么解决这个问题?

这里只讨论原则性问题:

  1. 价差交易策略发出问题时传递价差交易策略名称,而不是价差名称;
  2. 将改变价差价差交易策略名称写入OrderData的reference字段,建立与价差策略实例的关联;
  3. SpreadStrategyEngine在收到order,trade时,安照价差策略实例进行order,trade保存、统计和相关持仓计算,包括spread_pos的计算

当然,这样的改动是大了些,可是已经存在目前的问题,修改是必须的!

1. 每个价差策略都会在on_init()时调用load_bar()

1.1 价差策略的on_init()

例如下面的DemoStrategy价差策略的代码:

class DemoStrategy(SpreadStrategyTemplate):
    """
    利用BOLL通道进行套利的一种价差交易
    """

    author = "hxxjava"

    bar_window = 5      # K线周期
    boll_window = 26    # BOLL参数1
    boll_dev = 2        # BOLL参数2        
    target_pos = 1      # 开仓数量
    payup = 10          
    interval = 5

    spread_pos = 0.0
    boll_mid = None
    boll_up = None
    boll_down = None

    sk_algoid:str = ""
    bp_algoid:str = ""
    bk_algoid:str = ""
    sp_algoid:str = ""

    parameters = [
        "bar_window",
        "boll_window",
        "boll_dev",
        "target_pos",
        "payup",
        "interval"
    ]

    variables = [
        "spread_pos",
        "boll_mid",
        "boll_up",
        "boll_down",
        "sk_algoid",
        "bp_algoid",
        "bk_algoid",
        "sp_algoid"
    ]

    def __init__(
        self,
        strategy_engine,
        strategy_name: str,
        spread: SpreadData,
        setting: dict
    ):
        """"""
        super().__init__(
            strategy_engine, strategy_name, spread, setting
        )

        self.bg = BarGenerator(self.on_spread_bar,self.bar_window,self.on_xmin_spread_bar)
        self.am = ArrayManager()

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bar(days=10,callback=self.on_spread_bar)

    def on_spread_bar(self,bar:BarData):
        """
        Callback when 1 min spread bar data is generated.
        """
        print(f"on_spread_bar bar={bar}")       # 看看价差策略的bar长的是什么样子
        self.bg.update_bar(bar)

    。。。其他省略

2. 假如没有下载过数据到本地数据库,load_bar(10)将加载不了1分钟价差K线数据!

2.1 发现问题的过程

  • 编写好了价差交易策略之后;
  • 启动vnpy,连接到CTP接口;
  • 进入价差交易模块;
  • 创建一个跨期价差:rb2109&rb2201,主动腿为rb2109.SHFE,被动腿为rb2201.SHFE。价格乘数分别为1,-1,交易乘数分别为1,-1,没有问题;
  • 用DemoStrategy创建一个价差策略实例:DS_rb2109&rb2201,也是成功的。
  • 初始化DS_rb2109&rb2201,除了策略的inited=True之外,没有任何反应,看不到有任何1分钟价差K线数据被打印出来!

2.2 为什么加载不到任何1分钟价差K线数据?

进去查找,原来问题出在这里:
DemoStrategy调用的load_bar()是从SpreadStrateyTemplate继承的,而SpreadStrateyTemplate是load_bar()又调用了strategy_engine的load_bar()。
strategy_engine的load_bar()的代码如下:

    def load_bar(
        self, spread: SpreadData, days: int, interval: Interval, callback: Callable
    ):
        """"""
        end = datetime.now()
        start = end - timedelta(days)

        bars = load_bar_data(spread, interval, start, end)

        for bar in bars:
            callback(bar)

load_bar_data()的代码:

@lru_cache(maxsize=999)
def load_bar_data(
    spread: SpreadData,
    interval: Interval,
    start: datetime,
    end: datetime,
    pricetick: float = 0
):
    """"""
    # Load bar data of each spread leg
    leg_bars: Dict[str, Dict] = {}

    for vt_symbol in spread.legs.keys():
        symbol, exchange = extract_vt_symbol(vt_symbol)

        bar_data: List[BarData] = database_manager.load_bar_data(
            symbol, exchange, interval, start, end
        )

        bars: Dict[datetime, BarData] = {bar.datetime: bar for bar in bar_data}
        leg_bars[vt_symbol] = bars

    # Calculate spread bar data
    spread_bars: List[BarData] = []

    for dt in bars.keys():
        spread_price = 0
        spread_value = 0
        spread_available = True

        for leg in spread.legs.values():
            leg_bar = leg_bars[leg.vt_symbol].get(dt, None)

            if leg_bar:
                price_multiplier = spread.price_multipliers[leg.vt_symbol]
                spread_price += price_multiplier * leg_bar.close_price
                spread_value += abs(price_multiplier) * leg_bar.close_price
            else:
                spread_available = False

        if spread_available:
            if pricetick:
                spread_price = round_to(spread_price, pricetick)

            spread_bar = BarData(
                symbol=spread.name,
                exchange=exchange.LOCAL,
                datetime=dt,
                interval=interval,
                open_price=spread_price,
                high_price=spread_price,
                low_price=spread_price,
                close_price=spread_price,
                gateway_name="SPREAD",
            )
            spread_bar.value = spread_value
            spread_bars.append(spread_bar)

    return spread_bars

原来load_bar_data()中只考虑了从本地数据库加载1分钟价差K线数据(当然是用价差组合中的多个合约的1分钟K线数据合成的)。
而我因为没有事先下载过rb2109.SHFE和rb2201.SHFE的合约的1分钟K线数据,所以加载不到这10天中的任何1分钟价差K线数据!

3. 为什么不优先从米筐接口rqdatac加载1分钟价差K线?

就算加载不到1分钟价差K线的原因已经找到,可是这样的设计仍然是有问题的:

  • 1 要求不停地下载价差策略相关的合约数据不合理,因为这很容易忘记
  • 2 就算你昨天已经下载了价差策略相关的合约数据,今天你没有下载最新的数据,重新启动了价差策略,策略加载的数据就会缺少最新的数据

4. 修改成优先从米筐接口rqdatac加载1分钟价差K线!

鉴于以上的分析,把app\spread_trading\base.py做如下修改:

4.1 加入从rqdatac读取历史数据的query_bar_from_rq()函数,

# hxxjava debug spread_trading
def query_bar_from_rq(
    symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
    """
    Query bar data from RQData.
    """
    from vnpy.trader.rqdata import rqdata_client
    from vnpy.trader.object import HistoryRequest

    if not rqdata_client.inited:
        rqdata_client.init()

    req = HistoryRequest(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )
    data = rqdata_client.query_history(req)
    return data

4.2 修改价差K线的读取函数load_bar_data()的读取优先顺序,优先从米筐接口rqdatac加载1分钟价差K线,修改如下:

@lru_cache(maxsize=999)
def load_bar_data(
    spread: SpreadData,
    interval: Interval,
    start: datetime,
    end: datetime,
    pricetick: float = 0
):
    """"""
    # Load bar data of each spread leg
    leg_bars: Dict[str, Dict] = {}

    for vt_symbol in spread.legs.keys():
        symbol, exchange = extract_vt_symbol(vt_symbol)

        # hxxjava debug spread_trading
        bar_data = query_bar_from_rq(
            symbol=symbol, exchange=exchange,
            interval=interval,start=start,end=end
            )

        # if bar_data:
        #     print(f"load {symbol}.{exchange} {interval} bar_data, len of = {len(bar_data)}")

        if not bar_data:
            bar_data: List[BarData] = database_manager.load_bar_data(
                symbol, exchange, interval, start, end
            )

        bars: Dict[datetime, BarData] = {bar.datetime: bar for bar in bar_data}
        leg_bars[vt_symbol] = bars

    # Calculate spread bar data
    spread_bars: List[BarData] = []

    for dt in bars.keys():
        spread_price = 0
        spread_value = 0
        spread_available = True

        for leg in spread.legs.values():
            leg_bar = leg_bars[leg.vt_symbol].get(dt, None)

            if leg_bar:
                price_multiplier = spread.price_multipliers[leg.vt_symbol]
                spread_price += price_multiplier * leg_bar.close_price
                spread_value += abs(price_multiplier) * leg_bar.close_price
            else:
                spread_available = False

        if spread_available:
            if pricetick:
                spread_price = round_to(spread_price, pricetick)

            spread_bar = BarData(
                symbol=spread.name,
                exchange=exchange.LOCAL,
                datetime=dt,
                interval=interval,
                open_price=spread_price,
                high_price=spread_price,
                low_price=spread_price,
                close_price=spread_price,
                gateway_name="SPREAD",
            )
            spread_bar.value = spread_value
            spread_bars.append(spread_bar)

    return spread_bars

5. load_bar()从本地数据库加载的1分钟价差K线数据只有K线

看看 load_bar()从本地数据库加载的1分钟价差K线数据,如下所示:
你会发现其的成交量,volume=0,在使用过程从必须加以注意!
也就是说,需要成交量参与计算的指标是不可利用价差K线序列来计算的!

bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 15, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=109.0, high_price=109.0, low_price=109.0, close_price=109.0)
bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 16, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=108.0, high_price=108.0, low_price=108.0, close_price=108.0)
bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 17, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=106.0, high_price=106.0, low_price=106.0, close_price=106.0)

建议在图中:

description
另外图中的⑧是创建错误的价差组合,软件运行删除,可是删除了之后还必须重新启动了之后才会消失,是否可以考虑直接有软件清除释放该价差组合?

1. PortfolioStrategy自带的策略PairTradingStrategy

这个组合策略实质上是价差交易。
我理解其本意是选择两个相关的合约进行价差交易,它有两个腿:leg1和leg2,将其leg1和leg2按照leg1_ratio和leg2_ratio设定的比例进行配比做出价差K线。
该组合的开仓信号为:
价差K线突破BOLL上轨或者上轨时,卖出leg1,买入leg2
价差K线突破BOLL上轨或者下轨时,买入leg1,卖出leg2
该组合的平仓信号为:
leg1有持多仓并且价差K线在BOLL中轨之上时,分别平仓leg1和leg2
leg1有持空仓并且价差K线在BOLL中轨之下时,分别平仓leg1和leg2

2. 当leg1_ratio和leg2_ratio不相等的时候就错误了:

当leg1_ratio和leg2_ratio相等的时候,一起都没有毛病。
当leg1_ratio和leg2_ratio不相等的时候,就有问题了。你会发现它们开仓的数量总是一样多,这是不对的。

比如leg1为i2109.DCE, leg2为rb2110.DCE。
我们知道i2109的合约乘数是100,atr=2.7,而rb2109的合约乘数是10,atr=7.7,
leg2_ratio/leg1_ratio =(2.7100)/(7.710) ≈ 7/2
那么我们应该配比leg1_ratio=2和leg2_ratio=7,这意味着i2109.DCE开仓2手,就需要反向开仓rb2110.DCE 开仓7手。
因为价差的计算是这样的:

self.current_spread = (
            leg1_bar.close_price * self.leg1_ratio - leg2_bar.close_price * self.leg2_ratio
        )

而实际开仓却是(1手,-1手)或(-1手,1手)的组合,与价差计算不符合。

3. 做如下修改:

代码如下,修改部分见注释:

from typing import List, Dict
from datetime import datetime

import numpy as np

from vnpy.app.portfolio_strategy import StrategyTemplate, StrategyEngine
from vnpy.trader.utility import BarGenerator
from vnpy.trader.object import TickData, BarData


class PairTradingStrategy(StrategyTemplate):
    """"""

    author = "用Python的交易员"

    price_add = 5
    boll_window = 20
    boll_dev = 2
    # fixed_size = 1  # 没有使用,去掉
    leg1_ratio = 1
    leg2_ratio = 1

    leg1_symbol = ""
    leg2_symbol = ""
    current_spread = 0.0
    boll_mid = 0.0
    boll_down = 0.0
    boll_up = 0.0

    parameters = [
        "price_add",
        "boll_window",
        "boll_dev",
        # "fixed_size",   # 没有使用,去掉
        "leg1_ratio",
        "leg2_ratio",
    ]
    variables = [
        "leg1_symbol",
        "leg2_symbol",
        "current_spread",
        "boll_mid",
        "boll_down",
        "boll_up",
    ]

    def __init__(
        self,
        strategy_engine: StrategyEngine,
        strategy_name: str,
        vt_symbols: List[str],
        setting: dict
    ):
        """"""
        super().__init__(strategy_engine, strategy_name, vt_symbols, setting)

        self.bgs: Dict[str, BarGenerator] = {}
        self.targets: Dict[str, int] = {}
        self.last_tick_time: datetime = None

        self.spread_count: int = 0
        self.spread_data: np.array = np.zeros(100)

        # Obtain contract info
        self.leg1_symbol, self.leg2_symbol = vt_symbols

        def on_bar(bar: BarData):
            """"""
            pass

        for vt_symbol in self.vt_symbols:
            self.targets[vt_symbol] = 0
            self.bgs[vt_symbol] = BarGenerator(on_bar)

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bars(1)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        if (
            self.last_tick_time
            and self.last_tick_time.minute != tick.datetime.minute
        ):
            bars = {}
            for vt_symbol, bg in self.bgs.items():
                bars[vt_symbol] = bg.generate()
            self.on_bars(bars)

        bg: BarGenerator = self.bgs[tick.vt_symbol]
        bg.update_tick(tick)

        self.last_tick_time = tick.datetime

    def on_bars(self, bars: Dict[str, BarData]):
        """"""
        self.cancel_all()

        # Return if one leg data is missing
        if self.leg1_symbol not in bars or self.leg2_symbol not in bars:
            return

        # Calculate current spread
        leg1_bar = bars[self.leg1_symbol]
        leg2_bar = bars[self.leg2_symbol]

        # Filter time only run every 5 minutes
        if (leg1_bar.datetime.minute + 1) % 5:
            return

        self.current_spread = (
            leg1_bar.close_price * self.leg1_ratio - leg2_bar.close_price * self.leg2_ratio
        )

        # Update to spread array
        self.spread_data[:-1] = self.spread_data[1:]
        self.spread_data[-1] = self.current_spread

        self.spread_count += 1
        if self.spread_count <= self.boll_window:
            return

        # Calculate boll value
        buf: np.array = self.spread_data[-self.boll_window:]

        std = buf.std()
        self.boll_mid = buf.mean()
        self.boll_up = self.boll_mid + self.boll_dev * std
        self.boll_down = self.boll_mid - self.boll_dev * std

        # Calculate new target position
        leg1_pos = self.get_pos(self.leg1_symbol)

        if not leg1_pos:
            if self.current_spread >= self.boll_up:
                self.targets[self.leg1_symbol] = -1*self.leg1_ratio     # hxxjava add *self.leg1_ratio
                self.targets[self.leg2_symbol] = 1*self.leg2_ratio      # hxxjava add *self.leg2_ratio
            elif self.current_spread <= self.boll_down:
                self.targets[self.leg1_symbol] = 1*self.leg1_ratio      # hxxjava add *self.leg1_ratio
                self.targets[self.leg2_symbol] = -1*self.leg2_ratio     # hxxjava add *self.leg2_ratio
        elif leg1_pos > 0:
            if self.current_spread >= self.boll_mid:
                self.targets[self.leg1_symbol] = 0
                self.targets[self.leg2_symbol] = 0
        else:
            if self.current_spread <= self.boll_mid:
                self.targets[self.leg1_symbol] = 0
                self.targets[self.leg2_symbol] = 0

        # Execute orders
        for vt_symbol in self.vt_symbols:
            target_pos = self.targets[vt_symbol]
            current_pos = self.get_pos(vt_symbol)

            pos_diff = target_pos - current_pos
            volume = abs(pos_diff)
            bar = bars[vt_symbol]

            if pos_diff > 0:
                price = bar.close_price + self.price_add

                if current_pos < 0:
                    self.cover(vt_symbol, price, volume)
                else:
                    self.buy(vt_symbol, price, volume)
            elif pos_diff < 0:
                price = bar.close_price - self.price_add

                if current_pos > 0:
                    self.sell(vt_symbol, price, volume)
                else:
                    self.short(vt_symbol, price, volume)

        self.put_event()

4. 另外:如何设置正相关和负相关的配对?

当leg1与leg2正相关时,leg1_ratio和leg2_ratio同为正整数;
当leg1与leg2负相关时,leg1_ratio为正整数和leg2_ratio同为负整数。

CTP TradeApi接口的合约交易状态通知函数定义:

OnRtnInstrumentStatus

合约交易状态通知,主动推送。公有流回报。

各交易所的合约状态变化详见合约状态变化说明。

◇ 1.函数原型virtual void OnRtnInstrumentStatus(CThostFtdcInstrumentStatusField *pInstrumentStatus) {};

其参数pInstrumentStatus:合约状态定义:

struct CThostFtdcInstrumentStatusField
{
    ///交易所代码
    TThostFtdcExchangeIDType    ExchangeID;
    ///保留的无效字段
    TThostFtdcOldExchangeInstIDType reserve1;
    ///结算组代码
    TThostFtdcSettlementGroupIDType SettlementGroupID;
    ///保留的无效字段
    TThostFtdcOldInstrumentIDType   reserve2;
    ///合约交易状态
    TThostFtdcInstrumentStatusType  InstrumentStatus;
    ///交易阶段编号
    TThostFtdcTradingSegmentSNType  TradingSegmentSN;
    ///进入本状态时间
    TThostFtdcTimeType  EnterTime;
    ///进入本状态原因
    TThostFtdcInstStatusEnterReasonType EnterReason;
    ///合约在交易所的代码
    TThostFtdcExchangeInstIDType    ExchangeInstID;
    ///合约代码
    TThostFtdcInstrumentIDType  InstrumentID;
};
EnterTime:只有郑商所的时间戳是CTP的本地时间,其他交易所的是交易所时间

其中有一个字段“交易阶段编号”(TradingSegmentSN )是如何编号的?

1. CtaTemplate模版的成员trading是指示策略是否处于交易状态

1.1 CtaTemplate模版的成员trading的定义

class CtaTemplate(ABC):
    """"""

    author = ""
    parameters = []
    variables = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        """"""
        self.cta_engine = cta_engine
        self.strategy_name = strategy_name
        self.vt_symbol = vt_symbol

        self.inited = False
        self.trading = False       # 策略是否处于交易状态
        self.pos = 0

    ... ... 其他略去

你会发现self.trading除了初始化时赋值为False之外,并没有在CtaTemplate的任何其他地方被修改赋值。

1.2 CtaTemplate模版的成员trading是由CtaEngine来维护的

我们的CTA策略都是从CtaTemplate模版扩展而来。当我们的CTA策略被实例化为运行策略时,self.trading就被缓冲到以CTA策略实例名称为文件名的json文件中。之后随着CTA策略被初始化、启动、停止,self.trading的状态在False和True之间做相应的变化,并调用put_event()函数写入json文件,调用sync_data()函数从json文件中读出。这两个函数中都是调用了self.cta_engine的功能。

    def put_event(self):
        """
        Put an strategy data event for ui update.
        """
        if self.inited:
            self.cta_engine.put_strategy_event(self)
    def sync_data(self):
        """
        Sync strategy variables value into disk storage.
        """
        if self.trading:
            self.cta_engine.sync_strategy_data(self)

1.3 CtaEngine的put_strategy_event()函数

    def put_strategy_event(self, strategy: CtaTemplate):
        """
        Put an event to update strategy status.
        """
        data = strategy.get_data()                                       # 策略的变量,包含的trading
        event = Event(EVENT_CTA_STRATEGY, data)
        self.event_engine.put(event)                                   # 发送消息EVENT_CTA_STRATEGY给订阅者,通知策略变量变化了

1.4 CtaEngine的sync_strategy_data()函数

    def sync_strategy_data(self, strategy: CtaTemplate):
        """
        Sync strategy data into json file.
        """
        data = strategy.get_variables()

        # 下面的两句把inited和trading都剔除了
        data.pop("inited")      # Strategy status (inited, trading) should not be synced.
        data.pop("trading")    

        self.strategy_data[strategy.strategy_name] = data
        save_json(self.data_filename, self.strategy_data)   # 把其他策略变量写入json文件

2. CtaTemplate模版的成员trading不能告诉策略当前交易合约否处于交易状态

由上面的代码分析发现,策略的trading并没有考虑交易合约是否可以处在交易状态。
策略的trading是会影响到策略的委托行为的,下面是CtaTemplate的send_order()函数:

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ):
        """
        Send a new order.
        """
        if self.trading:                                                    # 只要self.trading==True就可以委托
            vt_orderids = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock, net
            )
            return vt_orderids
        else:
            return []

实际使用中会发现策略可能会在休市或者非交易时间,因为收到的错误数据而误动作,导致出现委托无法得到回应或者是拒单。
其原因也是因为作为委托条件的self.trading并没有考虑委托时刻,交易合约是否可以处在交易状态!

3. 怎么解决这个问题?

3.1 开放CTP网关接口的合约品种交易状态推送接口函数

合约品种交易状态推送接口函数是onRtnInstrumentStatus()。
具体方法我已经在再谈集合竞价 里详细地讨论过了,文章很长,希望有耐心看完,这里就不再赘述。

注:其他网关接口也应该类似。

3.2 委托执行条件 = 合约交易状态 + self.trading

3.2.1 合约有哪些交易状态

/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'

typedef char TThostFtdcInstrumentStatusType;

只有在交易合约在“连续交易”和 “集合竞价报单”这两种状态下,同时交易者也启动了策略(trading==True)情况下,策略才可以执行委托。
也就是说 :委托执行条件 = 合约交易状态 + self.trading。

3.2.3 实现方法

待续...

1. IF2107的集合竞价

1.1 修改策略的on_tick(),以便输出集合竞价tick数据

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        tick_time = tick.datetime.time()
        time1 = time(20,55,0)
        time2 = time(21,0,1)
        if time1 <= tick_time <= time2:
            print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")

        if ("IF" in tick.vt_symbol) and ("CFFEX" in tick.vt_symbol):
            time1 = time(9,25,0)
            time2 = time(9,30,1)
            if time1 <= tick_time <= time2:
                print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")

        super().on_tick(tick)

1.2 集合竞价tick数据

【集合竞价tick数据 
tick_time=09:29:00.500000 
tick=TickData(
    gateway_name='CTP', 
    symbol='IF2107', 
    exchange=<Exchange.CFFEX: 'CFFEX'>, 
    datetime=datetime.datetime(2021, 6, 9, 9, 29, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), 
    name='沪深300指数2107', 
    volume=7, 
    open_interest=22276.0, 
    last_price=5183.8, 
    last_volume=0, 
    limit_up=5698.200000000001, 
    limit_down=4662.2, 
    open_price=5183.8, 
    high_price=5183.8, 
    low_price=5183.8, 
    pre_close=5184.0, 
    bid_price_1=5180.2, 
    bid_price_2=0, 
    bid_price_3=0, 
    bid_price_4=0, 
    bid_price_5=0, 
    ask_price_1=5184.0, 
    ask_price_2=0, 
    ask_price_3=0, 
    ask_price_4=0, 
    ask_price_5=0, 
    bid_volume_1=4, 
    bid_volume_2=0, 
    bid_volume_3=0, 
    bid_volume_4=0, 
    bid_volume_5=0, 
    ask_volume_1=3, 
    ask_volume_2=0, 
    ask_volume_3=0, 
    ask_volume_4=0, 
    ask_volume_5=0)】

1.3 开市后第一秒的tick数据

1.3.1 开市后第一秒的第一个tick数据

tick_time=09:30:00.500000 
tick=TickData(
    gateway_name='CTP', 
    symbol='IF2107', 
    exchange=<Exchange.CFFEX: 'CFFEX'>, 
    datetime=datetime.datetime(2021, 6, 9, 9, 30, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), 
    name='沪深300指数2107', 
    volume=10, 
    open_interest=22274.0, 
    last_price=5180.2, 
    last_volume=0, 
    limit_up=5698.200000000001, 
    limit_down=4662.2, 
    open_price=5183.8, 
    high_price=5183.8, 
    low_price=5180.2, 
    pre_close=5184.0, 
    bid_price_1=5180.2, 
    bid_price_2=0, 
    bid_price_3=0, 
    bid_price_4=0, 
    bid_price_5=0, 
    ask_price_1=5180.4, 
    ask_price_2=0, 
    ask_price_3=0, 
    ask_price_4=0, 
    ask_price_5=0, 
    bid_volume_1=2, 
    bid_volume_2=0, 
    bid_volume_3=0, 
    bid_volume_4=0, 
    bid_volume_5=0, 
    ask_volume_1=1, 
    ask_volume_2=0, 
    ask_volume_3=0, 
    ask_volume_4=0, 
    ask_volume_5=0)】

1.3.2 开市后第一秒的第二个tick数据

tick_time=09:30:01 
tick=TickData(
    gateway_name='CTP', 
    symbol='IF2107', 
    exchange=<Exchange.CFFEX: 'CFFEX'>, 
    datetime=datetime.datetime(2021, 6, 9, 9, 30, 1, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
    name='沪深300 指数2107', 
    volume=13, 
    open_interest=22272.0, 
    last_price=5180.2, 
    last_volume=0, 
    limit_up=5698.200000000001, 
    limit_down=4662.2, 
    open_price=5183.8, 
    high_price=5183.8, 
    low_price=5180.2, 
    pre_close=5184.0, 
    bid_price_1=5177.4, 
    bid_price_2=0, 
    bid_price_3=0, 
    bid_price_4=0, 
    bid_price_5=0, 
    ask_price_1=5179.2, 
    ask_price_2=0, 
    ask_price_3=0, 
    ask_price_4=0, 
    ask_price_5=0, 
    bid_volume_1=5, 
    bid_volume_2=0, 
    bid_volume_3=0, 
    bid_volume_4=0, 
    bid_volume_5=0, 
    ask_volume_1=1, 
    ask_volume_2=0, 
    ask_volume_3=0, 
    ask_volume_4=0, 
    ask_volume_5=0)】

2. 深交所的股票的集合竞价——有意思!

2.1 白云机场的1分钟K线图:

以白云机场为例,它的集合竞价是每个交易日的14:56-14:59,但是它这三分钟却只形成1根1分钟K线!
下面是白云机场的1分钟K线图:
description

2.2 1分钟K线的代表的交易时间长度不一定是1分钟

由上图可知:
深交所的股票在集合竞价阶段1分钟K线就是3分钟时长,其它时段的又是1分钟;
上交所的股票在集合竞价阶段是没有1分钟K线的,它发生在9:25-9:29,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货日盘合约的集合竞价阶段发生在上午开市前5分钟的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货夜盘合约的集合竞价阶段发生在前一交易日的夜间20:55-21:00的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。

3. 问题:vnpy的BarGenerator如何正确处理这些集合竞价的tick数据?

3.1 目前的BarGenerator无法正确处理这些集合竞价的tick数据。

这个问题我之前已经有文章讨论过了,需要修改是肯定的!

3.2 正确处理这些集合竞价的tick数据需要那些条件?

  • 交易时间段
  • 集合竞价时段
  • 集合竞价的数据应该合并到开市后第一根K线,还是独立存在?

4. 被vnpy忽视的CTP接口功能:合约交易状态

4.1 合约交易状态即进入原因定义

位于vnpy_ctp\api\include\ctp\ThostFtdcUserApiDataType.h

/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'

typedef char TThostFtdcInstrumentStatusType;

/////////////////////////////////////////////////////////////////////////
///TFtdcInstStatusEnterReasonType是一个品种进入交易状态原因类型
/////////////////////////////////////////////////////////////////////////
///自动切换
#define THOST_FTDC_IER_Automatic '1'
///手动切换
#define THOST_FTDC_IER_Manual '2'
///熔断
#define THOST_FTDC_IER_Fuse '3'

typedef char TThostFtdcInstStatusEnterReasonType;

4.2 让vnpy接收CTP接口合约交易状态信息

1)在trader\constant.py中定义合约交易状态类型

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"

2)在trader\object.py中定义合约状态类型StatusData

@dataclass
class StatusData(BaseData):
    """
    合约状态 hxxjava debug
    """
    # 合约代码
    symbol:str       
    # 交易所代码                       
    exchange : Exchange    
    # 结算组代码                     
    settlement_group_id : str = ""  
    # 合约交易状态             
    instrument_status : InstrumentStatus = None   
    # 交易阶段编号
    trading_segment_sn : int = None 
    # 进入本状态时间
    enter_time : str = ""      
    # 进入本状态原因  
    enter_reason : str = ""  
    # 合约在交易所的代码       
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

3)在trader\event.py中定义合约状态消息EVENT_STATUS

EVENT_STATUS = "eStatus"                        # hxxjava debug

4)在trader\gateway.py中给gateway增加on_status()函数

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)

5) 在vnpy_ctp\gateway\ctp_gateway.py文件中,为TdApi加入如下函数

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约状态信息 # hxxjava debug 
        """
        if data:
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = data["InstrumentStatus"],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = data["EnterReason"],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status = {status}")
            self.gateway.on_status(status)

6) 运行vnpy,连接CTP接口,看看都收到了什么?

status=StatusData(gateway_name='CTP', symbol='nr', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='nr')
status=StatusData(gateway_name='CTP', symbol='lu', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='lu')
status=StatusData(gateway_name='CTP', symbol='sc', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='sc')
... ...  相同略去 内容太多,包括了这个CTP接口中所有合约品种的合约状态信息,其他省略了

7) 收到了合约状态信息有什么用?

合约状态信息是有交易服务器推送到客户端的,其中包含如下的合约状态:
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
并且还有进入的时间和原因,这些信息正是我们解决BarGenerator在处理集合竞价时段的tick数据错误问题所需要的!

5. CTA策略如何使用使用合约状态信息?

5.1 在OmsEngine中收集当前市场的所有合约的合约状态信息

在vnpy\trader\engine.py文件中,为OmsEngine做如下的修改:

class OmsEngine(BaseEngine):
    """
    Provides order management system function for VN Trader.
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.statuses: Dict[str, StatusData] = {}       # hxxjava debug
        self.active_orders: Dict[str, OrderData] = {}

        self.add_function()
        self.register_event()

    def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_statuses = self.get_all_statuses       # hxxjava debug
        self.main_engine.get_all_active_orders = self.get_all_active_orders

    def register_event(self) -> None:
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  
        self.event_engine.register(EVENT_STATUS, self.process_status_event)  # hxxjava debug

    ... ...  相同略去


    def process_status_event(self, event: Event) -> None:   # hxxjava debug
        """"""
        status = event.data
        # print(f"got a status = {status}")
        self.statuses[status.vt_symbol] = status

    ... ...  相同略去


    def get_all_statuses(self) -> List[StatusData]:     # hxxjava debug
        """
        Get all status data.
        """
        return list(self.statuses.values())


    ... ...  相同略去

注意:
这个步骤的目的:
把CTP接口接收到的所有合约品种的状态信息保存到self.statuses字典中。
为系统的主引擎main_engine提供访问所有合约品种的状态信息函数get_all_statuses()

5.2 把CTA策略模板CtaTemplate做如下修改

5.2.1 CTA策略引擎中,增加策略初始化时订阅EVENT_STATUS

修改app\cta_strategy\engine.py文件中的CtaEngine,下面只给出主要的代码修改部分:

class CtaEngine(BaseEngine):
    """"""

    ... ...  相同略去


    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)      # hxxjava debug

    ... ...  相同略去

    def process_status_event(self,event:Event): # hxxjava debug
        """ 分发合约某种状态信息到相应的策略 """
        status:StatusData = event.data

        for vt_symbol in self.symbol_strategy_map.keys():
            symbol,exchange = extract_vt_symbol(vt_symbol)
            instrument = left_alphas(symbol)
            if  (status.vt_symbol.upper() in [symbol.upper(),instrument.upper()]) and (status.exchange == exchange):
                # 分发合约某种状态信息到相应的所有策略中
                strategies = self.symbol_strategy_map[vt_symbol]
                for strategy in strategies:
                    self.call_strategy_func(strategy, strategy.on_status, status) 

    ... ...  相同略去

5.2.2 CtaTemplate增加 on_status()处理接口推送的合约状态信息

在app\cta_strategy\template.py文件中,为cta_template模版增加下面的接口:

    @virtual
    def on_status(self, status: StatusData): # hxxjava debug
        """
        Callback of status data
        """
        pass

5.2.3 为您自己CTA策略增加 on_status()处理接口推送的合约状态信息

简单举例如下:

    def on_status(self, status: StatusData):    # hxxjava debug
        print(f"strategy {self.strategy_name} got a status event {status}")

到这里,您可以让你的CTA策略在感知到策略正在交易的合约交易状态变化了。
具体如何使用合约交易状态信息, 这么做取决于您的需求了。举例如下:

  • 当在合约处在集合竞价完成阶段收到了一条tick信息,你就可以把它代入到开盘的第一个连续竞价时间段,把该tick合并到第一个1分钟K线中
  • 当最新收到的合约交易状态为非交易状态(既不是集合竞价状态,也不连续竞价状态),策略就可以不发出委托,以免被拒单
  • 目前vnpy的CTA策略中,使用BarGenerator生成1分钟K,在各个交易时间段的最后1分钟是不会生成1分钟K线的,直到收到下一个各个交易时间段的第一个tick,才会生成上个交易时间段的最后1分钟K线。现在我们就可以利用合约交易状态的变化(如合约品种转入了非交易状态或者收盘状态),可以让BarGenerator立即生成当前的1分钟K线。

论坛对交易理论的讨论有点少

大家总是在讨论编程,就没有人谈谈如何才能够盈利,如果不能够盈利,干嘛还要编程!

让K线图表说话

description

做多5手

description

后势怎么看?

如果不能够有效上破5070,rb2110还得跌!
一家之言,不喜勿喷。

1. 连接UFT接口后,报如下错误

选择恒生UFT网关,连接后就报如下错误:

KeyError: ('TradeDate',)

At:
  D:\ProgramFiles\VnStudio\lib\site-packages\vnpy\gateway\uft\uft_gateway.py(728): update_trade
  D:\ProgramFiles\VnStudio\lib\site-packages\vnpy\gateway\uft\uft_gateway.py(502): onRspQryTrade

2. 错误的位置

错误位置是gateway\uft\uft_gateway.py的728行:

    def update_trade(self, data: dict) -> None:
        """"""
        symbol = data["InstrumentID"]
        exchange = EXCHANGE_UFT2VT[data["ExchangeID"]]
        sessionid = data["SessionID"]
        order_ref = data["OrderRef"]
        orderid = f"{sessionid}_{order_ref}"

        order = self.orders.get(orderid, None)
        if order:
            order.traded += data["TradeVolume"]

            if order.traded < order.volume:
                order.status = Status.PARTTRADED
            else:
                order.status = Status.ALLTRADED

            self.gateway.on_order(order)

        trade_time = generate_time(data["TradeTime"])
        timestamp = f"{data['TradeDate']} {trade_time}"         # 就是这里的TradeDate出错,意思是没有该字典项目
        dt = datetime.strptime(timestamp, "%H:%M:%S")
        dt = CHINA_TZ.localize(dt)

        trade = TradeData(
            symbol=symbol,
            exchange=exchange,
            orderid=orderid,
            tradeid=data["TradeID"],
            direction=DIRECTION_UFT2VT[data["Direction"]],
            offset=OFFSET_UFT2VT[data["OffsetFlag"]],
            price=data["TradePrice"],
            volume=data["TradeVolume"],
            datetime=dt,
            gateway_name=self.gateway_name
        )
        self.gateway.on_trade(trade)

3. CHSTradeField中的交易日叫’TradingDay‘不叫’TradeDate‘

找到api\uft\include\HSStruct.h中的CHSTradeField,发现CHSTradeField结构中没有TradeDate字段,应该是TradingDay字段。

///成交信息
struct CHSTradeField
{
    /// 账号
    HSAccountID                   AccountID;
    /// 成交编码
    HSTradeID                     TradeID;
    /// 报单编码
    HSOrderSysID                  OrderSysID;
    /// 经纪公司报单编码
    HSBrokerOrderID               BrokerOrderID;
    /// 会话编码
    HSSessionID                   SessionID;
    /// 报单引用
    HSRef                         OrderRef;
    /// 交易所代码
    HSExchangeID                  ExchangeID;
    /// 合约代码
    HSInstrumentID                InstrumentID;
    /// 买卖方向
    HSDirection                   Direction;
    /// 开平标志
    HSOffsetFlag                  OffsetFlag;
    /// 投机/套保/备兑类型
    HSHedgeType                   HedgeType;
    /// 成交数量
    HSVolume                      TradeVolume;
    /// 成交价格
    HSPrice                       TradePrice;
    /// 交易日
    HSDate                        TradingDay;   # 这里是交易日 !!!
    /// 成交时间
    HSTime                        TradeTime;
    /// 期权对应的标的合约代码
    HSInstrumentID                UnderlyingInstrID;
};

4. 如何修改错误

打开gateway\uft\uft_gateway.py中UftTdApi的这个update_trade()函数做如下修改就OK了:

    def update_trade(self, data: dict) -> None:
        """"""
        symbol = data["InstrumentID"]
        exchange = EXCHANGE_UFT2VT[data["ExchangeID"]]
        sessionid = data["SessionID"]
        order_ref = data["OrderRef"]
        orderid = f"{sessionid}_{order_ref}"

        order = self.orders.get(orderid, None)
        if order:
            order.traded += data["TradeVolume"]

            if order.traded < order.volume:
                order.status = Status.PARTTRADED
            else:
                order.status = Status.ALLTRADED

            self.gateway.on_order(order)

        trade_time = generate_time(data["TradeTime"])
        # timestamp = f"{data['TradeDate']} {trade_time}"   # hxxjava debug 不是TradeDate
        timestamp = f"{data['TradingDay']} {trade_time}"    # hxxjava debug 是TradingDay
        # dt = datetime.strptime(timestamp, "%H:%M:%S")     # hxxjava 日期提取格式不对
        dt = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S") # hxxjava 日期提取格式 %Y%m%d %H:%M:%S
        dt = CHINA_TZ.localize(dt)

        trade = TradeData(
            symbol=symbol,
            exchange=exchange,
            orderid=orderid,
            tradeid=data["TradeID"],
            direction=DIRECTION_UFT2VT[data["Direction"]],
            offset=OFFSET_UFT2VT[data["OffsetFlag"]],
            price=data["TradePrice"],
            volume=data["TradeVolume"],
            datetime=dt,
            gateway_name=self.gateway_name
        )
        self.gateway.on_trade(trade)
© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】