下周simnow就可以恢复正常服务了,到时候你可以再连接simnow,如果没有问题,就是国金转发行情的时候出问题了。
c787297017 wrote:
hxxjava wrote:
答复:
问题分析
- 目前simnow调试,停止服务。我猜你现在使用的实盘账户在运作策略。所以你才可以继续使用CTP网关行情的。
- 你目前应该是连接到你的期货经纪商给提供的连接地址和端口号上的,也就是说,你可能是从你的期货经纪商的行情服务器获取的行情转发。
- 行情数据错误可能是两个方面造成的:①期货经纪商的行情转发错误;②本地行情接收成为
解决思路:
- 检查期货经纪商的行情使用的CTP版本,看看他们是否对CZCE的品种行情转发的毫秒数是错误的。(需要和你的期货经纪商沟通)
- 更换本地的CTP网关版本,最简单的时恢复到vnpy 3.0,订阅一个CZCE的合约,然后在onRtnDepthMarketData()中直接答应data看看。
这样就可以定位到问题了。
还有请问下哪个ctp版本可以获取到郑商所的毫秒数据
至少vnpy 3.0中的那个是没有问题。
这样就可以定位到问题了。
c787297017 wrote:
在将大佬设计的过滤器布置到实盘环境后,测试发现郑商所品种的tick_data中的datetime未精确到毫秒,所以每秒的两次tick数据datetime标识完全一致,后面的一次会被过滤器认为是无效tick,并且打印了【特别tick =】,上期所和大商所的品种tick数据datetime是精确到毫秒的,所以不存在该问题,过滤器正常运行,请问大佬是怎么处理郑商所这个问题的?
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
c787297017 wrote:
请问大佬使用vnpy可以实现期货集合竞价的前4分钟下单吗?
是不是可以在on_start中加一个阻塞,当确定市场已进入集合竞价(时间判定或者事件触发)时,在此处立即下单。
c787297017 wrote:
感谢hxx分享,我模仿您的方法对vnpy_portfoliostrategy和vnpy_tts(gateway)进行了修改,运行起来发现所有的tick都被过滤掉了,检查发现是TickFilter.process_status_event从未被触发过,因此TickFilter.statuses始终为空,这样的话说明EVENT_STATUS从未发生?请问有什么办法能排查某个事件从未发生的原因?
如果你使用的是TTS或者UFT的gateway的话,是不可以的,因为它们都不是完整的CTP柜台,它们都没有实现合约状态推送功能。
小刘小刘 wrote:
hxxjava老哥你好!看了很多你的帖子,你应该是会用CTP订阅大量合约行情的人。请教一个问题,CTP订阅合约一台设备一次订阅多少个合约不会出现堵塞情况,和设备的网络带宽有关系吗?
报单流控、查询流控和会话数控制都是针对交易服务器接口而言的,与行情服务器接口无关。
因为行情是交易所透过网络推送的,行情客户端通过套接字来接收,所以一台设备一次能够订阅的最大合约数量与多个因素有关:
这样修改后策略得到的account是整个账户的资金情况。
而实际情况是你可能在跑若干个Cta策略,组合策略,算法交易,期权交易。获取整个账户的资金情况对交易几乎有什么意义。
如果能够为每个策略配置一个逻辑账户,分配一个起始资金,然后根据策略的交易历史,统计该逻辑账户下的资金情况还是非常有意义的。
在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。
声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:
from .event import EVENT_ORIGIN_TICK,EVENT_STATUS # hxxjava add
from .object import StatusData # hxxjava add
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_ORIGIN_TICK, tick) # hxxjava add
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
修改vnpy_cpt\ctp_gateway.py:
from vnpy.trader.constant import InstrumentStatus,StatusEnterReason # hxxjava debug
rom vnpy.trader.object import StatusData, # hxxjava debug
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
from vnpy.trader.event import EVENT_AUCTION_TICK # hxxjava add
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 """
tick:TickData = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
__init__.py
中的CtaStrategyApp做如下修改class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
为class BaseDatabase增加下面两个接口函数:
@abstractmethod
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
pass
@abstractmethod
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
pass
class MyDateTimeField(DateTimeField):
def get_modifiers(self):
return [6]
class DbLastTick(Model): # hxxjava add
""" 最新TICK数据表映射对象 """
id = AutoField()
gateway_name: str = CharField()
symbol: str = CharField()
exchange: str = CharField()
datetime: datetime = MyDateTimeField()
name: str = CharField()
volume: float = FloatField()
turnover: float = FloatField()
open_interest: float = FloatField()
last_price: float = FloatField()
last_volume: float = FloatField()
limit_up: float = FloatField()
limit_down: float = FloatField()
open_price: float = FloatField()
high_price: float = FloatField()
low_price: float = FloatField()
pre_close: float = FloatField()
bid_price_1: float = FloatField()
bid_price_2: float = FloatField(null=True)
bid_price_3: float = FloatField(null=True)
bid_price_4: float = FloatField(null=True)
bid_price_5: float = FloatField(null=True)
ask_price_1: float = FloatField()
ask_price_2: float = FloatField(null=True)
ask_price_3: float = FloatField(null=True)
ask_price_4: float = FloatField(null=True)
ask_price_5: float = FloatField(null=True)
bid_volume_1: float = FloatField()
bid_volume_2: float = FloatField(null=True)
bid_volume_3: float = FloatField(null=True)
bid_volume_4: float = FloatField(null=True)
bid_volume_5: float = FloatField(null=True)
ask_volume_1: float = FloatField()
ask_volume_2: float = FloatField(null=True)
ask_volume_3: float = FloatField(null=True)
ask_volume_4: float = FloatField(null=True)
ask_volume_5: float = FloatField(null=True)
localtime: datetime = DateTimeField(null=True)
class Meta:
database = db
indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)
class MysqlDatabase的初始化做如下修改:
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview]) # hxxjava add DbLastTick,DbContractData
再为class MysqlDatabase添加下面两个函数:
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
vt_symbols = [t.vt_symbol for t in ticks]
# 删除ticks列表中包含合约的旧的tick记录
d: ModelDelete = DbLastTick.delete().where(
(DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
)
count = d.execute()
# print(f"delete {count} last ticks")
# 构造最新的ticks列表数据
data = []
for t in ticks:
tick:TickData = deepcopy(t) # hxxjava change
tick.datetime = tick.datetime
d = tick.__dict__
d["exchange"] = d["exchange"].value
d.pop("vt_symbol")
data.append(d)
# print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbLastTick.insert_many(c).on_conflict_replace().execute()
return True
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
try:
# 从DbLastTick查询符合条件的最新tick记录
s: ModelSelect = (
DbLastTick.select().where(
(DbLastTick.gateway_name == gateway_name)
& (exchange is None or DbLastTick.exchange == exchange.value)
& (symbol is None or DbLastTick.symbol == symbol)
).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
)
# 利用最新tick记录构造ticks列表
ticks: List[TickData] = []
for db_tick in s:
tick:TickData = TickData(
symbol=db_tick.symbol,
exchange=Exchange(db_tick.exchange),
datetime=to_china_tz(db_tick.datetime),
name=db_tick.name,
volume=db_tick.volume,
turnover=db_tick.turnover,
open_interest=db_tick.open_interest,
last_price=db_tick.last_price,
last_volume=db_tick.last_volume,
limit_up=db_tick.limit_up,
limit_down=db_tick.limit_down,
open_price=db_tick.open_price,
high_price=db_tick.high_price,
low_price=db_tick.low_price,
pre_close=db_tick.pre_close,
bid_price_1=db_tick.bid_price_1,
bid_price_2=db_tick.bid_price_2,
bid_price_3=db_tick.bid_price_3,
bid_price_4=db_tick.bid_price_4,
bid_price_5=db_tick.bid_price_5,
ask_price_1=db_tick.ask_price_1,
ask_price_2=db_tick.ask_price_2,
ask_price_3=db_tick.ask_price_3,
ask_price_4=db_tick.ask_price_4,
ask_price_5=db_tick.ask_price_5,
bid_volume_1=db_tick.bid_volume_1,
bid_volume_2=db_tick.bid_volume_2,
bid_volume_3=db_tick.bid_volume_3,
bid_volume_4=db_tick.bid_volume_4,
bid_volume_5=db_tick.bid_volume_5,
ask_volume_1=db_tick.ask_volume_1,
ask_volume_2=db_tick.ask_volume_2,
ask_volume_3=db_tick.ask_volume_3,
ask_volume_4=db_tick.ask_volume_4,
ask_volume_5=db_tick.ask_volume_5,
localtime=db_tick.localtime,
gateway_name=db_tick.gateway_name
)
ticks.append(tick)
return ticks
except:
# 当DbLastTick表不存在的时候,会发生错误
return []
在vnpy.usertools下创建tickfilter.py文件,其内容如下:
"""
本文件主要实现tick数据过滤器——TickFilter。
tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持
作者:hxxjava
日期:2022-06-16
修改日期: 修改原因:
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
EVENT_ORIGIN_TICK,
EVENT_AUCTION_TICK,
EVENT_TICK,
EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol
def left_alphas(instr:str):
"""
得到字符串左边的字符部分
"""
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
def get_vt_instrument(vt_symbol:str):
"""
从完整合约代码转换到完整品种代码
"""
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
return f"{instrument}.{exchange.value}"
class TickFilter():
""" tick数据过滤器 """
CHECK_INTERVAL:int = 5 # 更新到数据库间隔
def __init__(self,event_engine:EventEngine,gateway_name:str):
""" tick数据过滤器初始化 """
self.event_engine = event_engine
self.gateway_name = gateway_name
self.db = get_database()
# 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}
# 品种及合约状态字典 { vt_symbol : StatusData }
self.statuses:Dict[str,StatusData] = {}
self.second_cnt = 0
self.load_last_ticks()
self.register_event()
# print(f"TickFilter {gateway_name}")
def load_last_ticks(self):
"""
加载属于网关名称为self.gateway_name的最新tick列表
"""
last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
for tick in last_ticks:
self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)
# print(f"load {len(last_ticks)} last ticks")
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event)
self.event_engine.register(EVENT_TIMER,self.check_last_ticks)
def process_tick_event(self,event:Event):
""" 对原始tick进行过滤 """
tick:TickData = event.data
# 检查tick合约的经验状态是否位有效交易状态
status:StatusData = self.statuses.get(tick.vt_symbol,None)
if not status:
vt_instrument = get_vt_instrument(tick.vt_symbol)
status = self.statuses.get(vt_instrument,None)
if not status:
# 未收到交易状态,返回
return
if status.instrument_status not in VALID_TRADE_STATUSES:
# 不在有效交易状态,返回
return
key = (tick.gateway_name,tick.vt_symbol)
_,oldtick = self.last_ticks.get(key,(None,None))
valid_tick = False
if not oldtick:
# 没有该合约的历史tick
self.last_ticks[key] = (True,tick)
valid_tick = True
elif tick.datetime > oldtick.datetime:
#
self.last_ticks[key] = (True,tick)
valid_tick = True
else:
print(f"【特别tick = {tick}】")
if valid_tick == True:
# 如果是有效的tick
if status.instrument_status != InstrumentStatus.CONTINOUS:
# 发送集合竞价tic消息到系统中
self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
else:
# 发送连续竞价tic消息到系统中
self.event_engine.put(Event(EVENT_TICK,tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
def process_status_event(self, event: Event):
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.statuses[status.vt_symbol] = status
# print(f"【{status.gateway_name} {status}】")
def check_last_ticks(self,event:Event) -> None:
""" 原始tick过滤器 """
self.second_cnt += 1
if self.second_cnt % self.CHECK_INTERVAL == 0:
# 如果到了定时间隔
# 查询所有更新的tick
changed_ticks = []
for key,(update,tick) in self.last_ticks.items():
if update:
changed_ticks.append(tick)
self.last_ticks[key] = (False,tick)
if changed_ticks:
# 如果存在更新的tick,保存到数据库
t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
t.start()
# print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")
修改vnpy\trader\engine.py
from vnpy.usertools.tickfilter import TickFilter # hxxjava add
在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:
self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add
修改其add_gateway(),内容如下:
def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
"""
Add gateway.
"""
# Use default name if gateway_name not passed
if not gateway_name:
gateway_name = gateway_class.default_name
gateway = gateway_class(self.event_engine, gateway_name)
self.gateways[gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
# add a tick data filter for the gateway # hxxjava add
if gateway_name not in self.tick_filters:
self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)
return gateway
只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""" Constructor """
... ... # 其他代码省略
self.auction_tick:TickData = None
self.last_tick: TickData = None
def update_auction_tick(self,tick:TickData):
""" 更新集合竞价tick """
self.auction_tick = tick
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
if self.auction_tick:
# 合约集合竞价tick到当前tick
tick.high_price = max(tick.high_price,self.auction_tick.high_price)
tick.low_price = min(tick.low_price,self.auction_tick.low_price)
# 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
self.last_tick = deepcopy(self.auction_tick)
# 成交量和成交额每天从0开始单调递增
self.last_tick.volume = 0.0
self.last_tick.turnover = 0.0
# 用完集合竞价tick就丢弃
self.auction_tick = None
... ... # 其他代码省略
def on_auction_tick(self, tick: TickData):
"""
集合竞价tick处理
"""
self.bg.update_auction_tick(tick) # 假设self.bg是已经创建过的bar生成器
MTF wrote:
这里是因为每个交易日收盘后系统要重启,所以这么设计的吧
每个交易日收盘后在下一个盘前系统要重启时,此时收到的第一个tick通常是上一个交易日收盘时最后一个tick,它的成交量是v2。随后进入集合竞价时段,在20:59时会收到一个集合竞价tick,它的成交量是v3。
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
# a: 成交量累计
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
# b: 成交额累计
turnover_change = tick.turnover - self.last_tick.turnover
self.bar.turnover += max(turnover_change, 0)
# c: 记录最新tick
self.last_tick = tick
交易所播报的tick这成交量和成交额是从每个交易日的集合竞价阶段开始累计的,它们都有单调递增的特性。
也就是说volume和turnover在一个交易日内只会增加,不会减少。
答案是没有的!
![]() |
---|
图1 |
此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3<v2。
![]() |
---|
图2 |
此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3>v2。
这种情况会发生吗?会的 !前一天的交易特别清淡,导致成交量非常小,而当前天因为收到什么特别因素刺激,开盘就非常火爆,光是集合竞价的成交量就比前一天的整体的成交量都大,就会出现此图所示的情况。
a处代码中的volume_change 是什么?它代表了从上一个tick到当前tick发生的成交量。
1分钟K线到成交量就是对这个volume_change的累计。但是有一个例外,当volume_change < 0时,累加的值就只能够为0了。
那么什么情况下会发生volume_change < 0呢?答案是发生在如图1所示的跨日的情况下!
可是还会发生如图2所示的跨日的情况,此时volume_change > 0,那么volume_change 就不应该是v3-v2,而应该是v3 !
所以a处成交量累计的代码是错误的,同理b处对成交额累计的代码也是错误的!
前天在你给我私信里,已经给你回复了,你没有看到吗?
这个问题已经修改了,因为我看到你贴出来的代码是原来有问题的代码,往二楼查看下。
需要更新下面的几个class:
你看不到回复是吗?可能是系统提示我的私信邮箱已经超了,发不了回信吧。
王中锋 wrote:
复现过程中遇到两个报错,解决不了,请大佬指点,拜托!!!
一、加载策略时遇到如下报错:
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\ui\widget.py", line 38, in init
self.cta_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 117, in init_engine
self.load_strategy_setting()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 907, in load_strategy_setting
self.add_strategy(
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 662, in add_strategy strategy: CtaTemplate = strategy_class(self, strategy_name, vt_symbol, setting)
File "C:\Users\Administrator\strategies\demo009_MA20903.py", line 94, in init
self.bg_x = MyBarGenerator(self.on_bar, self.x_min, self.on_x_bar, trading_hours=trading_hours)
File "D:\softwork\python3.10\lib\site-packages\vnpy\usertools\utility.py", line 52, in init
raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")
ValueError: MyBarGenerator need trading hours setting , please check it !二、回测时遇到如下报错:
Traceback (most recent call last):
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 55, in init
self.backtester_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\engine.py", line 63, in init_engine
self.backtesting_engine = BacktestingEngine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 87, in init
self.load_contracts() # hxxjava add
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 170, in load_contracts
contracts:List[ContractData] = database.load_contract_data()
AttributeError: 'SqliteDatabase' object has no attribute 'load_contract_data'
你可能不仔细按照我的帖子中去做,楼上“龙津”已经用了照着做,代码是没有问题的。你可以私信他,他应该会告诉你经验。
策略中使用到交易合约的合约信息是非常困难发生的事情,例如使用其中的合约乘数、最小交易交易单位、最小价格变动等 ... ... 。
如果您的策略中使用到交易合约的合约信息时,当您在休市的时候无法连接Gateway,那么你在回测的时候将无法进行下去。
怎么办,坐等交易所开市吗?本帖就讨论这样一个问题。
vnpy\trader\object.py中ContractData是这样定义的:
@dataclass
class ContractData(BaseData):
"""
Contract data contains basic information about each contract traded.
"""
symbol: str
exchange: Exchange
name: str
product: Product
size: float
pricetick: float
open_date:datetime = None # 上市日 open date hxxjava add
expire_date:datetime = None # 到期日 expire date hxxjava add
min_volume: float = 1 # minimum trading volume of the contract
stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None
option_listed: datetime = None
option_expiry: datetime = None
option_portfolio: str = ""
option_index: str = "" # for identifying options with same strike price
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
修改vnpy_ctp\gateway\ctp_gateway.py中的CtpTdApi接口onRspQryInstrument():
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""合约查询回报"""
product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product:
contract: ContractData = ContractData(
symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
name=data["InstrumentName"],
product=product,
size=data["VolumeMultiple"],
pricetick=data["PriceTick"],
open_date=datetime.strptime(data["OpenDate"], "%Y%m%d"), # hxxjava add
expire_date=datetime.strptime(data["ExpireDate"], "%Y%m%d"), # hxxjava add
gateway_name=self.gateway_name
)
# 期权相关
if contract.product == Product.OPTION:
# 移除郑商所期权产品名称带有的C/P后缀
if contract.exchange == Exchange.CZCE:
contract.option_portfolio = data["ProductID"][:-1]
else:
contract.option_portfolio = data["ProductID"]
contract.option_underlying = data["UnderlyingInstrID"]
contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
contract.option_strike = data["StrikePrice"]
contract.option_index = str(data["StrikePrice"])
contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
self.gateway.on_contract(contract)
symbol_contract_map[contract.symbol] = contract
if last:
self.contract_inited = True
self.gateway.write_log("合约信息查询成功")
for data in self.order_data:
self.onRtnOrder(data)
self.order_data.clear()
for data in self.trade_data:
self.onRtnTrade(data)
self.trade_data.clear()
在vnpy\trader\database.py中为BaseDatabase增加两个与合约信息相关的接口:
@abstractmethod
def save_constract_data(self, constracts: List[ContractData]) -> bool:
"""
Save constract data into database. hxxjava add
"""
pass
@abstractmethod
def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
"""
Load constract data from database. hxxjava add
"""
pass
本人使用的是MySql Server,所以就在vnpy_mysql\mysql_database.py中扩展这两个接口:
from vnpy.trader.constant import Exchange, Interval, Product # hxxjava add Product
class DbContractData(Model): # hxxjava add
"""K线数据表映射对象"""
id = AutoField()
symbol: str = CharField()
exchange: str = CharField()
name : str = CharField()
product : str = CharField()
size : float = FloatField()
pricetick : float = FloatField()
open_date: datetime = DateTimeField(default=None)
expire_date: datetime = DateTimeField(default=None)
min_volume : float = FloatField(default=1)
stop_supported : bool = BooleanField(default=False)
net_position : bool = BooleanField(default=False)
history_data : bool = BooleanField(default=False)
option_strike : float = FloatField(default=0)
option_underlying: str = CharField(default="")
option_type: str = CharField(default="")
option_listed: datetime = DateTimeField(default=None)
option_expiry: datetime = DateTimeField(default=None)
option_portfolio: str = CharField(default="")
option_index: str = CharField(default="")
gateway_name:str = CharField()
class Meta:
database = db
indexes = ((("open_date","exchange","symbol"), True),)
class MysqlDatabase(BaseDatabase):
"""Mysql数据库接口"""
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbBarOverview]) # hxxjava add DbContractData
def save_constract_data(self, contracts: List[ContractData]) -> bool:
"""
Save constract data into database. hxxjava add
"""
# 将constracts数据转换为字典
data = []
for contract in contracts:
copy_c = deepcopy(contract)
d = copy_c.__dict__
d["exchange"] = d["exchange"].value
d["product"] = d["product"].value
d.pop("vt_symbol")
data.append(d)
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbContractData.insert_many(c).on_conflict_replace().execute()
return True
def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
"""
Load constract data from database. hxxjava add
"""
symbol,exchange = "",""
if vt_symbol:
symbol,exchange = vt_symbol.split('.')
s: ModelSelect = DbContractData.select().where(
(not symbol or DbContractData.symbol == symbol)
& (not symbol or DbContractData.exchange == exchange)
).order_by(DbContractData.open_date,DbContractData.exchange,DbContractData.symbol)
contracts: List[ContractData] = []
for db_c in s:
# 取出四个时间
open_date = datetime.fromtimestamp(db_c.open_date.timestamp(), DB_TZ)
expire_date = datetime.fromtimestamp(db_c.expire_date.timestamp(), DB_TZ)
option_listed = None
option_expiry = None
product = Product(db_c.product)
if product == Product.OPTION:
option_listed = datetime.fromtimestamp(db_c.option_listed.timestamp(), DB_TZ)
option_expiry = datetime.fromtimestamp(db_c.option_expiry.timestamp(), DB_TZ)
contract = ContractData(
symbol = db_c.symbol,
exchange = Exchange(db_c.exchange),
name = db_c.name,
product = Product(db_c.product),
size = db_c.size,
pricetick = db_c.pricetick,
open_date = open_date,
expire_date = expire_date,
min_volume = db_c.min_volume,
stop_supported = db_c.stop_supported,
net_position = db_c.net_position,
history_data = db_c.history_data,
option_strike = db_c.option_strike,
option_underlying = db_c.option_underlying,
option_type = db_c.option_type,
option_listed = option_listed,
option_expiry = option_expiry,
option_portfolio = db_c.option_portfolio,
option_index = db_c.option_index,
gateway_name = db_c.gateway_name,
)
contracts.append(contract)
return contracts
修改vnpy\trader\engine.py做如下修改:
from threading import Thread # hxxjava add
from copy import deepcopy # hxxjava add
from .database import get_database # hxxjava add
class OmsEngine(BaseEngine):
"""
Provides order management system function.
"""
contract_file = "contracts.json" # hxxjava add
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.quotes: Dict[str, QuoteData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.active_quotes: Dict[str, QuoteData] = {}
self.db = get_database()
self.load_contracts() # hxxjava add 启动时就从数据库读取所有合约信息
self.add_function()
self.register_event()
def process_account_event(self, event: Event) -> None:
""""""
account: AccountData = event.data
self.accounts[account.vt_accountid] = account
self.save_contracts_changed() # hxxjava add
def process_contract_event(self, event: Event) -> None:
""""""
contract: ContractData = event.data
# self.contracts[contract.vt_symbol] = contract
# hxxjava change
if contract.vt_symbol not in self.contracts:
self.contracts[contract.vt_symbol] = contract
self.contracts_changed.append(contract)
def save_contracts_changed(self):
""" 启动线程保存所有新增的合约 """
# 复制所有新增的合约
contracts = [deepcopy(contract) for contract in self.contracts_changed]
self.contracts_changed = []
if contracts:
# 如果有新增的合约,启动线程保存
t = Thread(target=self.save_contracts,kwargs=({"contracts":contracts}))
t.start()
def save_contracts(self,contracts:List): # hxxjava add
""" save contracts into database """
self.db.save_constract_data(contracts)
print(f"一共保存了{len(contracts)}个合约 !")
def load_contracts(self): # hxxjava add
""" save all contracts into a json file """
contracts = self.db.load_contract_data()
for c in contracts:
self.contracts[c.vt_symbol] = c
self.contracts_changed:List[ContractData] = []
print(f"一共读取了{len(contracts)}个合约 !")
# print(f"self.contracts={self.contracts}个合约 !")
修改vnpy_ctastrategy\engine.py中的CtaEngine,添加get_contract():
def get_contract(self, strategy: CtaTemplate) -> Optional[ContractData]: # hxxjava add
"""
Get strategy's contract data.
"""
return self.main_engine.get_contract(strategy.vt_symbol)
修改vnpy_ctastrategy\template.py中的CtaTemplate,添加get_contract():
def get_contract(self):
"""
Return trading_hours of trading contract. # hxxjava add
"""
return self.cta_engine.get_contract(self)
经过什么这么复杂的步骤,就赋予了您的CTA策略一种直接读取交易合约的合约信息的能力。
如何使用呢?非常简单,这里给出一段用户策略的例子代码:
""" 得到交易合约的合约信息 """
contract:ContractData = self.get_contract()
size_n = contract.size # 合约乘数
price_tick = contract.price_tick # 最小价格变动
龙津 wrote:
请注意,如需使用MyBarGenerator进行CTA策略回测,回测脚本调用engine.engine.set_parameters函数的时候不要忘记传入trading_hours参数
回测时,trading_hours参数如何传入,能给出详细的代码吗?
MyBarGenerator进行CTA策略回测的修改已经放在7.1节中给出来,仔细修改下。
所有可自由交易标的行情变动都是随机过程的累积,本质上就是马尔可夫过程,注意:一定要是可自由交易,不信请往下看。
import numpy as np
from matplotlib import pyplot as plt
N = 10*600
List1 = list(np.random.randn(N))
data3m = []
data30m = []
tt3m = 0.0
for i in range(N):
tt3m += List1[i]
if i % 10==0:
data30m.append(tt3m)
data3m.append(tt3m)
plt.figure(figsize=(20,6))
plt.plot(data3m)
plt.show()
plt.figure(figsize=(20,6))
plt.plot(data30m)
它所形成的曲线是那么自然,优美,和真实的行情只差一个合约名称 ... ...
老秦 wrote:
真心请教各位,特别是hxxjava,
每次校验tick时,直接用最新的合约状态校验不行吗?为啥还涉及curr_status, next_status?
每次收到推送的状态,就更新最新合约状态,据此检验tick,这样不行吗?没太明白您的curr_status, next_status是啥意思,为何会有两个状态。
答复: