用Python的交易员 wrote:
pip install spread_trading安装即可,这个模块我们和VNStudio一起发布了的
1) vnpy是卸载干净原来的系统之后重新安装的;
2)执行pip install spread_trading显示没有这样的安装包(您是否是记错了?);
3)执行pip install vnpy_spreadtrading是可以安装(我猜测的,估计应该是对的),之后把文件Lib\site-packages\vnstation\info.py中的APP_INFO列表改回去,启动就OK了。
4)这样做了之后,在Lib\site-packages\vnpy_spreadtrading和Lib\site-packages\vnpy\app\spread_trading两个目录下,存在两套有关价差交易的代码。我把Lib\site-packages\vnpy\app\spread_trading目录完全删除了,也没有报错。这说明只使用Lib\site-packages\vnpy_spreadtrading下的代码就OK了。
难道包含新版本去掉价差交易功能了吗?
(VN Studio) D:\ProgramFiles\vnstudio>python -m vnstation
qt.network.ssl: QSslSocket: cannot resolve SSL_CTX_set_ciphersuites
qt.network.ssl: QSslSocket: cannot resolve SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
(VN Studio) D:\ProgramFiles\vnstudio>python -m vnstation
qt.network.ssl: QSslSocket: cannot resolve SSL_CTX_set_ciphersuites
qt.network.ssl: QSslSocket: cannot resolve SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot call unresolved function SSL_set_psk_use_session_callback
Traceback (most recent call last):
File "D:\ProgramFiles\vnstudio\lib\runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "D:\ProgramFiles\vnstudio\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\__main__.py", line 4, in <module>
cli()
File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1137, in __call__
return self.main(*args, **kwargs)
File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1062, in main
rv = self.invoke(ctx)
File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1668, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "D:\ProgramFiles\vnstudio\lib\site-packages\click\core.py", line 763, in invoke
return __callback(*args, **kwargs)
File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\cli.py", line 28, in runtrader
run_trader(s)
File "D:\ProgramFiles\vnstudio\lib\site-packages\vnstation\cli.py", line 89, in run_trader
module = importlib.import_module(d["module"])
File "D:\ProgramFiles\vnstudio\lib\importlib\__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'vnpy_spreadtrading'
打开文件Lib\site-packages\vnstation\info.py,找到APP_INFO列表,做如下修改(见注释):
APP_INFO = [
{
"name": "CtaStrategy",
"module": "vnpy_ctastrategy",
"class": "CtaStrategyApp",
"description": "CTA自动交易模块"
},
{
"name": "CtaBacktester",
"module": "vnpy_ctabacktester",
"class": "CtaBacktesterApp",
"description": "CTA回测研究模块"
},
{
"name": "SpreadTrading",
# "module": "vnpy_spreadtrading", # 目前的vnpy的python中并不存在vnpy_spreadtrading目录
"module": "vnpy.app.spread_trading", # 把价差模块重新指向原来的目录
"class": "SpreadTradingApp",
"description": "多合约价差套利模块"
},
{
"name": "AlgoTrading",
"module": "vnpy.app.algo_trading",
"class": "AlgoTradingApp",
"description": "算法委托执行交易模块"
},
{
"name": "OptionMaster",
"module": "vnpy.app.option_master",
"class": "OptionMasterApp",
"description": "期权波动率交易模块"
},
{
"name": "PortfolioStrategy",
"module": "vnpy.app.portfolio_strategy",
"class": "PortfolioStrategyApp",
"description": "多合约组合策略模块"
},
{
"name": "ScriptTrader",
"module": "vnpy.app.script_trader",
"class": "ScriptTraderApp",
"description": "脚本策略交易模块"
},
{
"name": "MarketRadar",
"module": "vnpy.app.market_radar",
"class": "MarketRadarApp",
"description": "市场扫描雷达模块"
},
{
"name": "ChartWizard",
"module": "vnpy.app.chart_wizard",
"class": "ChartWizardApp",
"description": "实时K线图表模块"
},
{
"name": "RpcService",
"module": "vnpy.app.rpc_service",
"class": "RpcServiceApp",
"description": "RPC服务器模块"
},
{
"name": "ExcelRtd",
"module": "vnpy.app.excel_rtd",
"class": "ExcelRtdApp",
"description": "EXCEL RTD模块"
},
{
"name": "DataManager",
"module": "vnpy_datamanager",
"class": "DataManagerApp",
"description": "历史数据管理模块"
},
{
"name": "DataRecorder",
"module": "vnpy.app.data_recorder",
"class": "DataRecorderApp",
"description": "实盘行情记录模块"
},
{
"name": "RiskManager",
"module": "vnpy_riskmanager",
"class": "RiskManagerApp",
"description": "事前风险管理模块"
},
{
"name": "WebTrader",
"module": "vnpy_webtrader",
"class": "WebTraderApp",
"description": "Web服务器模块"
},
{
"name": "PortfolioManager",
"module": "vnpy.app.portfolio_manager",
"class": "PortfolioManagerApp",
"description": "投资组合管理模块"
},
{
"name": "PaperAccount",
"module": "vnpy.app.paper_account",
"class": "PaperAccountApp",
"description": "模拟交易账户模块"
},
]
按照2来修改Info.py,保存后,重新启动就再次选择spread_trading模块了,没有报错了。
不知道这个问题这样解决是否合适?存在两种可能性:
你的笔模型里应该有从哪个K线到哪个K线——可以决定是起止的X值,两个顶点价格可以决定Y值,有了起止的X和Y值就可以画线了,不是就可以画出笔了吗?
盘整期 RSI 是在高出开空仓,趋势下 RSI 是在高开多仓。
本帖旨在通过对CTP接口推送的tick行情数据进行过滤和分析,揭示利用未经有效性处理的tick驱动各种上层应用可能带来的错误,以及这些错误所带来的影响。
依据事实说话,讲道理,不是空谈!仔细阅读,耐心分析,你一定会用巨大收获!
按照如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据一文中的4.4节的方法对原始数据进行过滤,其中OmsEngine新增了函数process_origin_tick_event(),代码是这样的:
def process_origin_tick_event(self,event: Event):#-> None: # hxxjava debug
""" 原始tick数据处理 """
tick = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
# hxxjava debug
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
if status.instrument_status in VALID_TRADE_STATUSES:
# 这里是所有有效数据的发源地
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
print(f"{datetime.now()} 特别交易状态={status} {tick}")
下面是实际运行python -m vnstation之后,对CTP接口推送的数据的过滤而得到的部分结果,暂时称之为脏数据。
之所以称之为脏数据,是因为它们和当前合约的交易状态不符。
其中包含了当时的本地实际、当前合约交易状态和tick数据库三部分。
因为打印出来的内容每行比较长,建议您把下面的内容复制粘贴到其他的文本编辑器(如vscode)中,方便对左右移动,也可以上下对齐。
2021-07-15 07:07:05.767769 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=15, enter_time='23:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 23, 0, 0, 500000), name='铁矿石2109', volume=121262, open_interest=435280.0, last_price=1206.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1215.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1206.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1206.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=51, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=33, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:14:59.837070 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15), name='铁矿石2109', volume=225979, open_interest=448149.0, last_price=1230.5, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=33, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=68, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:14:59.994983 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15, 0, 500000), name='铁矿石2109', volume=225984, open_interest=448151.0, last_price=1231.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=39, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=63, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 10:15:00.253865 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=25, enter_time='10:15:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 10, 15, 1), name='铁矿石2109', volume=225984, open_interest=448151.0, last_price=1231.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1233.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1230.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1231.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=7, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=64, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:29:59.852447 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30), name='铁矿石2109', volume=283444, open_interest=465948.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=23, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=29, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:29:59.923405 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 0, 500000), name='铁矿石2109', volume=283453, open_interest=465947.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=23, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=22, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:30:00.185258 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 1), name='铁矿石2109', volume=283456, open_interest=465945.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=21, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=30, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 11:30:02.015268 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.NO_TRADING: '非交易'>, trading_segment_sn=29, enter_time='11:30:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 11, 30, 1, 500000), name='铁矿石2109', volume=283457, open_interest=465944.0, last_price=1230.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1229.5, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1230.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=21, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=29, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.727185 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0 ), name='铁矿石2109', volume=359871, open_interest=484883.0, last_price=1234.5, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=169, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=40, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.834127 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 0, 500000), name='铁矿石2109', volume=359884, open_interest=484882.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=167, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=13, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.927076 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 1 ), name='铁矿石2109', volume=359885, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1233.5, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=167, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=1, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:00:01.931069 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 0, 1, 500000), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:01:04.571289 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 1, 5, 500000), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
2021-07-15 15:10:52.207334 特别交易状态=StatusData(gateway_name='CTP', symbol='i', exchange=<Exchange.DCE: 'DCE'>, settlement_group_id='00000001', instrument_status=<InstrumentStatus.CLOSE: '收盘'>, trading_segment_sn=99, enter_time='15:00:00', enter_reason=<StatusEnterReason.AUTOMATIC: '自动切换'>, exchange_inst_id='i') TickData(gateway_name='CTP', symbol='i2109', exchange=<Exchange.DCE: 'DCE'>, datetime=datetime.datetime(2021, 7, 15, 15, 10,53 ), name='铁矿石2109', volume=359886, open_interest=484883.0, last_price=1234.0, last_volume=0, limit_up=1335.5, limit_down=1093.5, open_price=1208.0, high_price=1240.0, low_price=1196.0, pre_close=1219.5, bid_price_1=1233.0, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=1234.0, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=172, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=12, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0)
为了方便描述,我们把上面的内容进行编号,编号的顺序为打印信息的本地时间,
1. 2021-07-15 07:07:05.767769
2. 2021-07-15 10:14:59.837070
3. 2021-07-15 10:14:59.994983
4. 2021-07-15 10:15:00.253865
5. 2021-07-15 11:29:59.852447
6. 2021-07-15 11:29:59.923405
7. 2021-07-15 11:30:00.185258
8. 2021-07-15 11:30:02.015268
9. 2021-07-15 15:00:01.727185
10. 2021-07-15 15:00:01.834127
11. 2021-07-15 15:00:01.927076
12. 2021-07-15 15:00:01.931069
13. 2021-07-15 15:01:04.571289
14. 2021-07-15 15:10:52.207334
下面是对这几条的分析:
再次连接CTP接口后,接口自动推送i2109合约昨天夜里23:00收盘的价格和盘口信息,
已经进入上午10:15休市时段,volume=225979, last_price=1230.5
已经进入上午10:15休市时段,volume=225984, last_price=1231.0 bid_volume_1=7 ask_volume_1=64
已经进入上午11:30休市时段,volume=283444, open_interest=465948.0, last_price=1230.0,
已经进入上午11:30休市时段,volume=283457, open_interest=465944.0, last_price=1230.0,
已经进入下午15:00休市时段,volume=359871, open_interest=484883.0, last_price=1234.5,ask_price_1=1234.5,ask_volume_1=40,
10.已经进入下午15:00休市时段,volume=359884, open_interest=484882.0, last_price=1234.0,ask_price_1=1234.0,ask_volume_1=13,
11.已经进入下午15:00休市时段,volume=359885, open_interest=484883.0, last_price=1234.0,ask_price_1=1234.5,ask_volume_1=1,
12.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0
10.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0 —— 除了和时间戳与12不同之外,tick的其他字段完全相同
11.已经进入下午15:00休市时段,open_interest=484883.0, last_price=1234.0 —— 除了和时间戳与12不同之外,tick的其他字段完全相同
对于1中的tick,不过滤可以在刚启动的行情列表中显示,对启动应用后再连接接口的上层应用会产生多余的K线,过滤了更好
对于2,3,4中的tick,是进入休市后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了更好
对于5,6,7中的tick,是进入休市后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了更好
对于9,10,11中的tick,是进入收盘后仍然有撮合, 不过滤可能造成上层应用会产生多余的K线(如5分钟K线),过滤了影响跨收盘时刻K线的持仓兴趣、成交量等信息的统计
对于12,13,14中的tick,是进入收盘后已经没有撮合, 完全可以过滤掉
为什么说2-7中的tick过滤了更好?
因为这些tick是在非交易时间段延续的撮合,其价格、持仓兴趣、成交量可以忽略,它们采用下一个交易状态下tick中的价格、持仓兴趣、成交量更好!
如何更有效地利用合约交易状态信息——把集合竞价tick合并到开盘的K线中?
对于国内期货,通常,夜盘合约在20:55之后,日盘合约在8:55之后,合约进入集合竞价报单状态后,交易者就可以通过交易即可进行下单。下单后不会立即成交,而是经过交易所按照一定的规则进行匹配成交。集合竞价报单状态期间,可能会有tick推送给订阅的CTP客户端(如郑商所),但是此时的tick只有价格(last_price)没有成交量(volume=0);也可能没有任何tick推送给订阅的CTP客户端(如上期所)。
之后夜盘合约在20:59之后,日盘合约在8:59之后之后,合约进入集合竞价撮合阶段,交易所会通过行情接口推送1个tick到CTP客户端,之后会通过交易接口立即推送一次所有合约的连接的所有CTP客户端。此tick中包含的价格就是开盘价,成交量不为0。
我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中介绍了如果提取合约交易状态信息。这些合约交易状态信息就包含了与合约集合竞价相关的几个状态,包括:集合竞价报单、集合竞价平衡(本人一直没有检测到)和集合竞价撮合几个交易状态。
因为vnpy对集合竞价产生的这个tick没有特别处理,把它看成是一个普通的tick了,所以如果你的策略中使用到n分钟周期K线的话,很可能产生一个只包含此tick的5分钟K线。
这和人们的通常习惯不符,也和通常的主流软件都不相同,也不符合集合竞价的原来本意不符,它应该被合并到包含开盘时刻的K线之中。
经过对合约交易状态的研究,发现合约交易状态信息恰好可以帮助我们正确处理集合竞价产生的这个tick,思路大致如下:
不失一般性,以5分钟K线为例:
修改vnpy\trader\engine.py中的OmsEngine的原始tick数据处理函数process_origin_tick_event()。
当收到集合竞价产生的开盘tick时,向tick中添加auction_tick属性并且赋值为True,缓存到self.ticks字典中,不转发该原始tick;收到其他连续竞价tick进行,检查self.ticks字典中是否存在该合约缓存的集合竞价产生的开盘tick,如果有则将缓存的开盘tick时间戳赋值为当前tick的时间戳减去1微秒,然后先发送开盘tick到消息队列,然后再发送当前tick到消息队列。代码如下:
代码如下:
def process_origin_tick_event(self,event: Event):#-> None: # hxxjava debug
""" 原始tick数据处理 """
tick:TickData = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
# hxxjava debug
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
if status.instrument_status in VALID_TRADE_STATUSES:
# 这里是所有有效数据的发源地
# 处理行情和合约交易状态不同步的问题
relay_tick = False
tick_time = tick.datetime.strftime("%H%M%S.%f")
next_status = self.get_next_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
# 当前处在集合竞价阶段
if tick.volume == 0:
# 集合竞价保单的tick,放弃
return
if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
tick_time >= next_status.enter_time:
# 连续竞价状态晚于其tick
relay_tick = True
else:
# 集合竞价产生的开盘tick,打上标记,缓存不发送
tick.aution_tick = True
self.ticks[tick.vt_symbol] = tick
else:
# 当前处在连续竞价阶段,转发tick
relay_tick = True
if relay_tick:
pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
if pre_tick and hasattr(pre_tick,"auction_tick"):
# 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
pre_tick.datetime = tick.datetime - timedelta(microseconds=1)
self.event_engine.put(Event(EVENT_TICK, pre_tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
# 把集合竞价tick转发给订阅者
self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
# else:
# print(f"{datetime.now()} 特别交易状态={status} {tick}")
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
# self.trade_status = TradeStatus(event_engine,6) # hxxjava add
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.trade_status_manager = TradeStatusManager(event_engine,30) # hxxjava debug
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_status = self.get_status # hxxjava debug
self.main_engine.get_next_status = self.get_next_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event) # hxxjava debug
def process_origin_tick_event(self,event: Event):#-> None: # hxxjava debug
""" 原始tick数据处理 """
tick:TickData = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
# hxxjava debug
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
if status.instrument_status in VALID_TRADE_STATUSES:
# 这里是所有有效数据的发源地
# 处理行情和合约交易状态不同步的问题
relay_tick = False
tick_time = tick.datetime.strftime("%H%M%S.%f")
next_status = self.get_next_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
# 当前处在集合竞价阶段
if tick.volume == 0:
# 集合竞价保单的tick,放弃
return
if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
tick_time >= next_status.enter_time:
# 连续竞价状态晚于其tick
relay_tick = True
else:
# 集合竞价产生的开盘tick,打上标记,缓存不发送
tick.aution_tick = True
self.ticks[tick.vt_symbol] = tick
else:
# 当前处在连续竞价阶段,转发tick
relay_tick = True
if relay_tick:
pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
if pre_tick and hasattr(pre_tick,"auction_tick"):
# 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
pre_tick.datetime = tick.datetime - timedelta(microseconds=1)
self.event_engine.put(Event(EVENT_TICK, pre_tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
# 把集合竞价tick转发给订阅者
self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
# else:
# print(f"{datetime.now()} 特别交易状态={status} {tick}")
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
status = self.get_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
next_status = self.get_next_status(tick.vt_symbol)
print(f"{datetime.now()} 集合竞价状态: \n\t{status}\n\t{next_status}\n\t{tick}")
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # hxxjava debug
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.trade_status_manager.save_status(status)
print(f"【{datetime.now()} {status}】")
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_status(self,vt_symbol:str) -> Optional[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_status(vt_symbol)
def get_next_status(self,vt_symbol:str) -> Optional[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_next_status(vt_symbol)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
按照上面的处理,我们就把从CTP接口推送来的tick进行来过滤。
在vnpy\trader\event.py文件中添加集合竞价tick数据消息
EVENT_AUCTION_TICK = "eAuctionTick." # 集合竞价tick数据消息 hxxjava debug
修改vnpy\trader\ui\widget.py中的BaseMonitor的消息订阅函数,使得一个控件可以订阅多个消息类型,条件是数据是相同格式的:
def register_event(self) -> None:
"""
Register event handler into event engine.
"""
if self.event_type:
self.signal.connect(self.process_event)
if type(self.event_type) == list: # hxxjava debug
for ev in self.event_type:
self.event_engine.register(ev, self.signal.emit)
# print(f"已经订阅 {ev} 消息")
else:
self.event_engine.register(self.event_type, self.signal.emit)
修改vnpy\trader\ui\widget.py中的TickMonitor的消息类型:
class TickMonitor(BaseMonitor):
"""
Monitor for tick data.
"""
# event_type = EVENT_TICK
event_type = [EVENT_TICK,EVENT_AUCTION_TICK] # 这里的消息类型不再是一种,而是一个列表
data_key = "vt_symbol"
sorting = True
headers = {
"symbol": {"display": "代码", "cell": BaseCell, "update": False},
"exchange": {"display": "交易所", "cell": EnumCell, "update": False},
"name": {"display": "名称", "cell": BaseCell, "update": True},
"last_price": {"display": "最新价", "cell": BaseCell, "update": True},
"volume": {"display": "成交量", "cell": BaseCell, "update": True},
"open_price": {"display": "开盘价", "cell": BaseCell, "update": True},
"high_price": {"display": "最高价", "cell": BaseCell, "update": True},
"low_price": {"display": "最低价", "cell": BaseCell, "update": True},
"bid_price_1": {"display": "买1价", "cell": BidCell, "update": True},
"bid_volume_1": {"display": "买1量", "cell": BidCell, "update": True},
"ask_price_1": {"display": "卖1价", "cell": AskCell, "update": True},
"ask_volume_1": {"display": "卖1量", "cell": AskCell, "update": True},
"datetime": {"display": "时间", "cell": TimeCell, "update": True},
"gateway_name": {"display": "接口", "cell": BaseCell, "update": False},
}
经过近2个月的编写、调试、观察、再调试,“如何更有效地利用合约交易状态信息”系列已经基本完成,
也基本完成了我当时对CTP接口中合约交易状态通知接口可能的几大用功能的实现。
我想这对任何程序化交易者都是个好消息!如果您也想这么做,请翻阅我这个系列的帖子,希望能够帮到您!
如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据。
不知道您是否发现这些情况:
这种情况不只是使用CTP接口的交易者会遇到,其他接口也是一样。
为了防止客户端接收到脏数据,CTP接口会在某个交易时间端的开始结束时,在公共流中播发所有品种的合约品种的交易状态通知,只是vnpy系统没有使用。
交易状态通知的格式见帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器。
系统连接接口后,订阅的行情数据就会从CTP网关的MdApi接口中推送给客户端。目前vnpy在收到tick数据时没有任何有效性判断,就直接推送给了各种订阅者了,而这正是脏数据的来由!
正确的做法是:客户端就在收到tick数据的时候,根据合约品种的交易状态判断该数据是是否为有效数据,如果合约的状态是开盘前、非交易或者是收盘状态,则将该tick丢弃。
这样就可以杜绝脏数据对交易者、各种应用策略或数据记录器的影响。
交易状态管理器的功能及实现代码详见 如何更有效地利用合约交易状态信息——交易状态信息管理器 ,这里就不再提供了。
在vnpy\trader\event.py文件中增加下面的消息定义:
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
在vnpy\trader\gateway.py文件中,将所有网关的父类Gateway的on_tick()函数做如下修改:
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
self.on_event(EVENT_ORIGIN_TICK, tick)
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
# self.trade_status = TradeStatus(event_engine,6) # hxxjava add
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.trade_status_manager = TradeStatusManager(event_engine,30) # 创建交易状态管理器
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_status = self.get_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # 订阅合约交易状态数据
self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event) # 订阅原始行情数据
def process_origin_tick_event(self,event: Event):#-> None: # 处理原始行情数据
""" 对原始tick数据进行有效性判断和处理 """
tick = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
valid_statuses = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
if status.instrument_status in valid_statuses:
# 这里是所有有效数据的发源地
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
print(f"{datetime.now()} 特别交易状态={status} {tick}")
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # 处理交易状态信息
""""""
status = event.data
# print(f"【{datetime.now()} {status}】")
self.trade_status_manager.save_status(status)
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_status(self,vt_symbol:str) -> List[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_status(vt_symbol)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
采用这样过滤脏数据的方法,可以从源头一次性地过滤掉脏数据,避免后面的各种应用中分别地对推送的行情数据进行过滤,应用策略无需再考虑脏数据的影响,逻辑简单,效率高。现在再回过头来看帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器 里的方法,就有点效率低下了!
本文只提供了CTP接口修改方法,没有涉及到其他类型的接口。可是作为一个完善的交易接口,通常都应该提供类似CTP接口中的类似合约交易状态通知的信息。采用的类似的方法也是可以办到的。
注明:
本贴是继如何解决CTP接口中集合竞价后立即下单被拒的问题 !之后,对交易状态信息进行更为深入地研究,实现了交易状态信息管理器,它可以更为有效地利用合约交易状态信息。
主要功能:
1)集中保存所有品种的交易状态到字典
2)保存所有品种的交易状态到字典到json文件
3)从json文件加载已经保存的交易状态到字典
4)定时同步交易状态到字典到json文件
5)提取交易合约的在某个时刻的交易状态
class TradeStatusManager(object):
"""
交易状态管理器。
主要功能:
1)集中保存所有品种的交易状态到字典
2)保存所有品种的交易状态到字典到json文件
3)从json文件加载已经保存的交易状态到字典
4)定时同步交易状态到字典到json文件
5)提取交易合约的在某个时刻的交易状态
"""
json_file = "trade_status.json"
def __init__(self,event_engine:EventEngine,interval:int)-> None:
self.event_engine = event_engine
self.interval = interval
self.count = 0
# {vt_symbol:{trade_segment_sn:StatusData}}
self.trade_status_map:Dict[str:Dict] = self.load_from_file()
self.change = False
self.register_event()
def register_event(self):
""" 注册消息事件 """
self.event_engine.register(EVENT_TIMER,self.on_time_event)
def load_from_file(self):
"""
从json文件读取交易状态映射
"""
filepath = get_file_path(self.json_file)
# print(f"filepath = {filepath}")
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data = json.load(fp=f,cls=TradeStatusDecoder)
return data
else:
save_json(self.json_file, {})
return {}
def sync_to_file(self):
"""
保存交易状态映射到json文件
"""
filepath = get_file_path(self.json_file)
with open(filepath, mode="w+", encoding="UTF-8") as f:
json.dump(
self.trade_status_map,fp=f,
cls=TradeStatusEncoder,
indent=4,
ensure_ascii=False
)
def on_time_event(self,event:Event):
self.count += 1
if self.count > self.interval:
self.count = 0
if self.change:
self.sync_to_file()
self.change = False
print(f"已经保存交易状态映射到交易状态映射!")
def save_status(self,status:StatusData):
"""
保存交易状态映射到交易状态映射
"""
vt_symbol = status.vt_symbol
if vt_symbol not in self.trade_status_map:
self.trade_status_map[vt_symbol] = {}
segment_sn_str = f"{status.trading_segment_sn}".rjust(2,'0')
self.trade_status_map[vt_symbol].update({"current":segment_sn_str})
self.trade_status_map[vt_symbol].update({segment_sn_str:status})
next_segment_sn = self._get_next_segment_sn(status.vt_symbol)
self.trade_status_map[vt_symbol].update({"next":next_segment_sn})
self.change = True
def get_status(self,vt_symbol:str):
"""
获得一个datetime所处的交易状态
"""
# 构造
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
ret_status = None
# dt_time = dt.time()
if vt_symbol1 in self.trade_status_map:
current = self.trade_status_map[vt_symbol1]['current']
# for enter_time_str,status in self.trade_status_map[vt_symbol1].items():
# enter_time = datetime.strptime(enter_time_str,"%H:%M:%S").time()
# if dt_time >= enter_time:
# ret_status = status
ret_status = self.trade_status_map[vt_symbol1][current]
return ret_status
def _get_next_segment_sn(self,vt_symbol:str):
""" 得到合约的下一交易状态编号 """
next_key:str = ""
if vt_symbol in self.trade_status_map:
instrment_dict = self.trade_status_map[vt_symbol]
current_key = instrment_dict['current']
keys = [key for key in sorted(instrment_dict.keys()) if key not in ["current","next"]]
if current_key == keys[-1]:
next_key = keys[0]
else:
index = 0
for key in keys:
index += 1
if key == current_key:
break
next_key = keys[index]
return next_key
def get_current_status(self,vt_symbol:str):
""" 得到合约的当前交易状态 """
status:StatusData = None
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
if vt_symbol1 in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol1]
key = status_dict['current']
status = status_dict[key]
return status
def get_next_status(self,vt_symbol:str):
""" 得到合约的下一交易状态 """
status:StatusData = None
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
if vt_symbol1 in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol1]
key = status_dict['next']
status = status_dict[key]
return status
def get_tick_status(self,tick:TickData):
"""
得到一个tick数据的合约所处交易状态
"""
status:StatusData = None
instrument = left_alphas(tick.symbol)
tick_time = tick.datetime.strftime("%H:%M:%S")
vt_symbol = f"{instrument}.{tick.exchange.value}"
if vt_symbol in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol]
curr_key = status_dict["current"]
next_key = status_dict["next"]
curr_status:StatusData = status_dict[curr_key]
next_status:StatusData = status_dict[next_key]
if curr_status.enter_time < next_status.enter_time:
if curr_status.enter_time <= tick_time < next_status.enter_time:
status = curr_status
elif next_status.enter_time <= tick_time:
status = next_status
else:
if curr_status.enter_time <= tick_time:
status = curr_status
elif next_status.enter_time <= tick_time:
status = next_status
return status
因为TradeStatusManager的trade_status_map字典是一个复杂字典,对其进行json转换需要自定义一个编码器。实现如下:
class TradeStatusEncoder(json.JSONEncoder):
"""
交易状态相关类型的编码器
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Exchange):
d['_value_'] = obj.value
elif isinstance(obj,InstrumentStatus):
d['_value_'] = obj.value
elif isinstance(obj,StatusEnterReason):
d['_value_'] = obj.value
elif isinstance(obj,StatusData):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
与2的原因相同,对把json转换到TradeStatusManager的trade_status_map,需要自定义一个译码器。实现如下:
class TradeStatusDecoder(json.JSONDecoder):
"""
交易状态相关类型的译码器
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Exchange':
value = d['_value_']
instance = Exchange(value)
elif class_name == 'InstrumentStatus':
value = d['_value_']
instance = InstrumentStatus(value)
elif class_name == 'StatusEnterReason':
value = d['_value_']
instance = StatusEnterReason(value)
elif class_name == 'StatusData':
vt_symbol = d.pop('vt_symbol')
symbol,exchange = extract_vt_symbol(vt_symbol)
args = dict((key,value) for key, value in d.items())
args.update({"symbol":symbol,"exchange":exchange})
instance = StatusData(**args)
else:
module_name = d.pop('__module__')
# print(f"!!!! class_name:{class_name} module_name:{module_name}")
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
这个文件通常保存在系统的用户目录下的.vntrader目录中。
这里仅仅展示一部分内容,如果您足够敏感的化,您应该能够立即领悟到这些信息的作用!
它是以合约品种为主键值,以进入时间为次键值的嵌套的字典,字典值为交易状态。
有了它你可以知道某个品种、合约在某个时刻之后处于什么状态。
这些状态包含:
有了这个状态信息,你可以:
{
"TA.CZCE": {
"19:44:24": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "19:44:24",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:04:29": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "20:04:29",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:55:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价报单"
},
"trading_segment_sn": 7,
"enter_time": "20:55:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:59:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价撮合"
},
"trading_segment_sn": 9,
"enter_time": "20:59:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"21:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "连续交易"
},
"trading_segment_sn": 13,
"enter_time": "21:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"23:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 15,
"enter_time": "23:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"current": "23:00:00"
},
"OI.CZCE": {
"19:44:24": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "19:44:24",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:04:29": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "20:04:29",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:55:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价报单"
},
"trading_segment_sn": 7,
"enter_time": "20:55:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:59:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价撮合"
},
"trading_segment_sn": 9,
"enter_time": "20:59:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"21:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "连续交易"
},
"trading_segment_sn": 13,
"enter_time": "21:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"23:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 15,
"enter_time": "23:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"current": "23:00:00"
},
... ... 内容太多,略去
}
未完待续 ... ...
说明:本帖只讨论了如何解决CTP接口中,集合竞价后立即下单被拒的问题
对集合竞价不太了解的童鞋们可以参阅 期货集合竞价时间期货集合竞价规则
做量化交易的都知道,在各个交易日的前都有集合竞价时段,集合竞价产生该交易日的开盘价。这个开盘价格是自上一个交易日结束至今所有市场因素对交易对象价格影响的集中体现,这些因素既包括基本面、技术面和消息面等因素!
行情也往往出现这些时候,尤其在经过一个交叉的节假日或者突发利空或者利多消息出现的时候更是如此!
行情出现了,当然策略就会发出信号,进而下单交易。
可是当策略收到收到开盘价的时候,市场还处在集合竞价阶段,此时下单将会被拒(相信大家都遇到过),而这个问题目前vnpy并没有解决。
如果是手工操盘,当然没有问题,可是我们的通过使用策略或者算法来交易的,怎么可以不解决这个问题 ?!!!
此时下单会被拒主要是因为市场还处在集合竞价状态,我们应该选择在连续竞价状态时下单即可。
就这么简单,可是实现起来有点儿复杂。
你可以把下单的逻辑简单地规定为21:00~23:00,9:00~11:30和13:30~15:00这几个时段才可以下单,其他时段禁止下单。
这倒是也可以解决一部分问题,但这样总会顾头不顾尾,很多品种你是无法处理的!
下面是合约交易状态通知推送接口函数和合约交易状态通知数据结构。
合约交易状态通知,主动推送。私有流回报。
◇ 1.函数原型virtual void OnRtnInstrumentStatus(CThostFtdcInstrumentStatusField *pInstrumentStatus) {};
回到顶部
◇ 2.参数pInstrumentStatus:合约状态
struct CThostFtdcInstrumentStatusField
{
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///合约在交易所的代码
TThostFtdcExchangeInstIDType ExchangeInstID;
///结算组代码
TThostFtdcSettlementGroupIDType SettlementGroupID;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///合约交易状态
TThostFtdcInstrumentStatusType InstrumentStatus;
///交易阶段编号
TThostFtdcTradingSegmentSNType TradingSegmentSN;
///进入本状态时间
TThostFtdcTimeType EnterTime;
///进入本状态原因
TThostFtdcInstStatusEnterReasonType EnterReason;
};
◇ 3.返回无
/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'
typedef char TThostFtdcInstrumentStatusType;
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
合约品种状态信息 hxxjava debug
"""
# 合约代码
symbol:str
# 交易所代码
exchange : Exchange
# 结算组代码
settlement_group_id : str = ""
# 合约交易状态
instrument_status : InstrumentStatus = None
# 交易阶段编号
trading_segment_sn : int = None
# 进入本状态时间
enter_time : str = ""
# 进入本状态原因
enter_reason : str = ""
# 合约在交易所的代码
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
""" 判断状态是否属于一个合约 """
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
在vnpy\trader\gateway.py中修改网关父类gateway,为其增加on_status()函数:
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
在vnpy_ctp\gateway\ctp_gateway.py中为CtpTdApi增加交易状态通知推送接口函数onRtnInstrumentStatus():
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
映射字典INSTRUMENTSTATUS_CTP2VT:
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
映射字典ENTERREASON_CTP2VT:
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
修改vnpy\trader\engine.py中的OmsEngine,修改如下:
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.statuses: Dict[str, StatusData] = {} # hxxjava debug
self.active_orders: Dict[str, OrderData] = {}
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_all_statuses = self.get_all_statuses # hxxjava debug
self.main_engine.get_status = self.get_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # hxxjava debug
""""""
status = event.data
# print(f"got a status = {status}")
self.statuses[status.vt_symbol] = status
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_all_statuses(self) -> List[StatusData]: # hxxjava debug
"""
Get all status data.
"""
return list(self.statuses.values())
def get_status(self,vt_symbol:str) -> List[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
symbol,exchange_str = vt_symbol.split('.')
instrment = left_alphas(symbol)
instrment_vt_symbol = f"{instrment}.{exchange_str}"
return self.statuses.get(instrment_vt_symbol,None)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
修改vnpy\app\cta_strategy\template.py,增加得到交易合约当前交易状态的函数get_trade_status():
def get_trade_status(self):
"""
得到交易合约当前交易状态
"""
main_engine = self.cta_engine.main_engine
return main_engine.get_status(self.vt_symbol)
在你的策略中调用buy()、sell()、short()、cover()或者send_order()下单函数之前,使用类似这样的判断:
下面使用某个CTA策略的on_bar()函数的来说明对交易状态的使用。
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
atr_array = am.atr(self.atr_length, array=True)
self.atr_value = atr_array[-1]
self.atr_ma = atr_array[-self.atr_ma_length:].mean()
self.rsi_value = am.rsi(self.rsi_length)
if self.get_trade_status() != InstrumentStatus.CONTINOUS:
# 注意:这里,如果当前合约不在连续竞价状态,放弃下面的信号计算和下单操作!!!
self.put_event()
return
if self.pos == 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = bar.low_price
if self.atr_value > self.atr_ma:
if self.rsi_value > self.rsi_buy:
self.buy(bar.close_price + 5, self.fixed_size)
elif self.rsi_value < self.rsi_sell:
self.short(bar.close_price - 5, self.fixed_size)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
self.intra_trade_low = bar.low_price
long_stop = self.intra_trade_high * \
(1 - self.trailing_percent / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
self.intra_trade_high = bar.high_price
short_stop = self.intra_trade_low * \
(1 + self.trailing_percent / 100)
self.cover(short_stop, abs(self.pos), stop=True)
self.put_event()
修改vnpy\app\spread_trading\template.py中的SpreadAlgoTemplate模板,增加get_legs_status()读取价差算法的各个腿当前交易状态:
def get_legs_status(self):
"""
得到当前策略所交易的价差交易状态 # hxxjava add
"""
legs_status:Dict[str:InstrumentStatus] = {}
main_engine = self.algo_engine.main_engine
for leg in self.spread.legs.values():
astatus:StatusData = main_engine.get_status(leg.vt_symbol)
legs_status[leg.vt_symbol] = astatus.instrument_status
return legs_status
修改vnpy\app\spread_trading\template.py中的SpreadStrategyTemplate模板,增加get_legs_status()读取价差交易策略的各个腿当前交易状态:
def get_legs_status(self):
"""
得到当前策略所交易的价差交易状态 # hxxjava add
"""
legs_status:Dict[str:InstrumentStatus] = {}
main_engine = self.strategy_engine.main_engine
for leg in self.spread.legs.values():
astatus:StatusData = main_engine.get_status(leg.vt_symbol)
legs_status[leg.vt_symbol] = astatus.instrument_status
return legs_status
在你的价差交易策略start_long_algo()和start_short_algo()下单函数之前,使用类似这样的判断:
下面使用某个价差交易策略的on_spread_data()函数的来说明对交易状态的使用,交易指令部分无需纠结从哪里来,只是举例。
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.bg.update_tick(tick)
# 得到价差交易策略的各个腿的交易状态
legs_status = list(self.get_legs_status().values())
# 这里要求价差交易策略的每个腿的交易状态均为连续竞价状态
is_continous_status = legs_status.count(InstrumentStatus.CONTINOUS) == len(self.spread.legs)
if self.trading and is_continous_status:
# 这里是交易指令,无需纠结代码从哪里来
pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))
如果您感兴趣,其他的模块,如算法交易模块algo、投资组合模块PortfolioStrategy等,在对交易状态信息的使用,可以参考上述的CTA策略米筐和价差交易模块的做法,应该不难。
也许您也和我一样饱受这个问题的困扰,希望本人的辛苦付出能够帮到你!
禅悟 wrote:
hxxjava wrote:
xiaohe wrote:
请教下,如何用BarGenerator生成小时线和日线? 是否有已经实现的范例? 谢谢!
self.bg = BarGenerator(self.on_spread_bar,1,self.on_hour_spread_bar,interval=Interval.HOUR)
self.bg = BarGenerator(self.on_spread_bar,1,self.on_day_spread_bar,interval=Interval.DAILY)
在vnpy\trader\event.py中自己增加定义即可.
谢谢xiaohe !!!
首先说明,这里所合成出来的价差K线只是简易价差K线,它和交易所发表的套利品种的K线相比还是有所差别的,主要的差别在最高价、最低价和开盘价,但收盘价是准确的。
# hxxjava debug spread_trading
def query_tick_from_rq(
symbol: str, exchange: Exchange, start: datetime, end: datetime
):
"""
Query tick data from RQData.
"""
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.object import HistoryRequest
if not rqdata_client.inited:
rqdata_client.init()
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=Interval.TICK,
start=start,
end=end
)
data = rqdata_client.query_tick_history(req)
return data
有两种方式:一种是从米筐加载数据下载tick,另一种是从数据库中读取已经录制的该价差的tick数据。
@lru_cache(maxsize=999)
def load_tick_data(
spread: SpreadData,
start: datetime,
end: datetime,
pricetick: float = 0
):
""""""
# hxxjava debug spread_trading
# 目前没有考虑反向合约的情况,以后解决
spread_ticks: List[TickData] = []
try:
# 防止因为用户没有米筐tick数据权限而发生异常
# Load tick data of each spread leg
dt_legs: Dict[str, Dict] = {} # datetime string : Dict[vt_symbol:tick]
format_str = "%Y%m%d%H%M%S.%f"
for vt_symbol in spread.legs.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
# hxxjava debug spread_trading
tick_data = query_tick_from_rq(symbol=symbol, exchange=exchange,start=start,end=end)
if tick_data:
print(f"load from rqdatac {symbol}.{exchange} tick_data, len of = {len(tick_data)}")
# save all the spread's legs tick into a dictionary by tick's datetime
for tick in tick_data:
dt_str = tick.datetime.strftime(format_str)
if dt_str in dt_legs:
dt_legs[dt_str].update({vt_symbol:tick})
else:
dt_legs[dt_str] = {vt_symbol:tick}
# Calculate spread bar data
# snapshot of all legs's ticks
snapshot:Dict[str,TickData] = {}
spread_leg_count = len(spread.legs)
for dt_str in sorted(dt_legs.keys()):
dt = datetime.strptime(dt_str,format_str).astimezone(LOCAL_TZ)
# get each datetime
spread_price = 0
spread_value = 0
# get all legs's ticks dictionary at the datetime
leg_ticks = dt_legs.get(dt_str)
for vt_symbol,tick in leg_ticks.items():
# save each tick into the snapshot
snapshot.update({vt_symbol:tick})
if len(snapshot) < spread_leg_count:
# if not all legs tick saved in the snapshot
continue
# out_str = f"{dt_str} "
# format_str1 = "%Y-%m-%d %H:%M:%S.%f "
for vt_symbol,tick in snapshot.items():
price_multiplier = spread.price_multipliers[vt_symbol]
spread_price += price_multiplier * tick.last_price
spread_value += abs(price_multiplier) * tick.last_price
# out_str += f"[{vt_symbol} {tick.datetime.strftime(format_str1)} {tick.last_price}],"
# print(out_str)
if pricetick:
spread_price = round_to(spread_price, pricetick)
spread_tick = TickData(
symbol=spread.name,
exchange=exchange.LOCAL,
datetime=dt,
open_price=spread_price,
high_price=spread_price,
low_price=spread_price,
last_price=spread_price,
gateway_name="SPREAD")
spread_tick.value = spread_value
spread_ticks.append(spread_tick)
if spread_ticks:
print(f"load {symbol}.{exchange}' ticks from rqdatac, len of = {len(tick_data)}")
finally:
if not spread_ticks:
# 读取数据库中已经录制过的该价差的tick数据
spread_ticks = database_manager.load_tick_data(spread.name, Exchange.LOCAL, start, end)
return spread_ticks
只要在你的价差策略中on_init()中添加如下代码,就可以调用:
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_tick(days=3)
修改vnpy\app\spread_trading\engine.py
增加下面函数:
def get_previous_trading_date(dt:datetime,days:int): # hxxjava add
"""
得到某个日期dt的去除了节假日的前days个交易日
"""
from vnpy.trader.rqdata import rqdata_client
import rqdatac as rq
if not rqdata_client.inited:
rqdata_client.init()
prev_td = rq.get_previous_trading_date(date=dt.date(),n=days)
return prev_td
修改如下,修改后两个函数的days参数就代表交易日了。
def load_bar(
self, spread: SpreadData, days: int, interval: Interval, callback: Callable
):
""""""
end = datetime.now()
# start = end - timedelta(days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
bars = load_bar_data(spread, interval, start, end)
print(f"{spread.name} {start}-{end} len of bars = {len(bars)}") # hxxjava debug spead_trading
for bar in bars:
callback(bar)
def load_tick(self, spread: SpreadData, days: int, callback: Callable):
""""""
end = datetime.now()
# start = end - timedelta(days=days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
ticks = load_tick_data(spread, start, end)
for tick in ticks:
callback(tick)
价差策略目标中SpreadStrategyTemplate中有load_bar()和load_tick()两个历史数据加载方法,代码如下:
class SpreadStrategyTemplate:
... ... 其他略去
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_spread_bar
self.strategy_engine.load_bar(self.spread, days, interval, callback)
def load_tick(self, days: int):
"""
Load historical tick data for initializing strategy.
"""
self.strategy_engine.load_tick(self.spread, days, self.on_spread_tick)
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
# 加载各腿的历史1分钟K线数据来合成价差1分钟K线
self.load_bar(days=10)
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
# 各腿的历史tick数据合成价差1分钟K线,当然比load_bar()慢多了
self.load_tick(days=10)
在客户端是永远无法合成出与交易所中一样的价差数据的,具体原因我会抽时间在另外的帖子里专门讨论此问题。
实际运行self.load_tick(days=10),加载不到任何历史1分钟K线数据。
代码如下:
def load_tick(self, spread: SpreadData, days: int, callback: Callable):
""""""
end = datetime.now()
start = end - timedelta(days)
ticks = load_tick_data(spread, start, end)
for tick in ticks:
callback(tick)
而这里用到的load_tick_data()的代码是这样的:
@lru_cache(maxsize=999)
def load_tick_data(
spread: SpreadData,
start: datetime,
end: datetime,
):
""""""
return database_manager.load_tick_data(
spread.name, Exchange.LOCAL, start, end
)
那意思是直接从本地数据库中读取价差的历史数据。注意是直接读取!
我们知道价差策略中使用的价差都是自定义的本地价差,使用vnpy中的DataRecorder是无法录制本地价差组合的数据的。
但是我们可以使用vnpy中的DataRecorder是录制交易所中套利合约的价差数据,它和其他的普通合约没有是么差别,它的历史tick是可以使用load_tick_data()来读取加载的。
可是价差策略无法交易套利合约。
五彩城 wrote:
如果tick数据不全,比如这一分钟有100条数据,正常应该是120条 1秒2跳才对,那么vnpy合成的1分钟会用100条数据还是120条数据?
比如只有3个tick,就用3条。
为什么?因当盘口中买1价总是低于卖1价,当没有人愿意出价比买1价高的价格买入,
也没有人愿意出比卖1价低的价格卖出时,盘口就处于安静状态,此时没有任何该合约
的tick推送给的客户端,那么可能这一分钟就不足120个tick,当下一分钟的tick被收
到时,就算是只收到了3个tick,当前的这一分钟K线也必须结束了。
xiaohe wrote:
如果tick.last_price是0的话,等同于not tick.last_price的
不对,价差的tick.last_price可以是0,也可以是负值,依据这个来判断是价差的tick是否为有效值是不正确的!