VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

下周simnow就可以恢复正常服务了,到时候你可以再连接simnow,如果没有问题,就是国金转发行情的时候出问题了。

c787297017 wrote:

hxxjava wrote:

答复:

问题分析

  1. 目前simnow调试,停止服务。我猜你现在使用的实盘账户在运作策略。所以你才可以继续使用CTP网关行情的。
  2. 你目前应该是连接到你的期货经纪商给提供的连接地址和端口号上的,也就是说,你可能是从你的期货经纪商的行情服务器获取的行情转发。
  3. 行情数据错误可能是两个方面造成的:①期货经纪商的行情转发错误;②本地行情接收成为

解决思路:

  1. 检查期货经纪商的行情使用的CTP版本,看看他们是否对CZCE的品种行情转发的毫秒数是错误的。(需要和你的期货经纪商沟通)
  2. 更换本地的CTP网关版本,最简单的时恢复到vnpy 3.0,订阅一个CZCE的合约,然后在onRtnDepthMarketData()中直接答应data看看。

这样就可以定位到问题了。

还有请问下哪个ctp版本可以获取到郑商所的毫秒数据

至少vnpy 3.0中的那个是没有问题。

答复:

问题分析

  1. 目前simnow调试,停止服务。我猜你现在使用的实盘账户在运作策略。所以你才可以继续使用CTP网关行情的。
  2. 你目前应该是连接到你的期货经纪商给提供的连接地址和端口号上的,也就是说,你可能是从你的期货经纪商的行情服务器获取的行情转发。
  3. 行情数据错误可能是两个方面造成的:①期货经纪商的行情转发错误;②本地行情接收成为

解决思路:

  1. 检查期货经纪商的行情使用的CTP版本,看看他们是否对CZCE的品种行情转发的毫秒数是错误的。(需要和你的期货经纪商沟通)
  2. 更换本地的CTP网关版本,最简单的时恢复到vnpy 3.0,订阅一个CZCE的合约,然后在onRtnDepthMarketData()中直接答应data看看。

这样就可以定位到问题了。

c787297017 wrote:

在将大佬设计的过滤器布置到实盘环境后,测试发现郑商所品种的tick_data中的datetime未精确到毫秒,所以每秒的两次tick数据datetime标识完全一致,后面的一次会被过滤器认为是无效tick,并且打印了【特别tick =】,上期所和大商所的品种tick数据datetime是精确到毫秒的,所以不存在该问题,过滤器正常运行,请问大佬是怎么处理郑商所这个问题的?

答复

  • 在simnow停止服务之前从来不会发现你说的这种情况,当下你应该用的不是CTP的gateway,请检查你所使用的gateway。
  • 打印了【特别tick =】的时候,应该是收到了不在有效交易状态下的tick数据,是应该被过滤掉的,正常。
  • 找到你所使用的gateway的MdApi的tick数据推送接口,仔细读代码,保证代码是类似下面的样子(这是CTP网关中的代码——有我修改的部分——看注释):
    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

zjcha wrote:

解决tick这些问题,是不是不用再解决K线合成器的问题了?

解决tick这些问题,仍然需要再解决K线合成器的问题,二者相辅相成。

c787297017 wrote:

请问大佬使用vnpy可以实现期货集合竞价的前4分钟下单吗?
是不是可以在on_start中加一个阻塞,当确定市场已进入集合竞价(时间判定或者事件触发)时,在此处立即下单。

回复:

  1. vnpy可以实现期货集合竞价期间手动下单
  2. 但是目前vnpy中的用户策略模板还没有提供对合约交易状态的感知功能,因此策略无法知道当前时间是否是集合竞价时段。
  3. 在可以收到合约交易状态前提下,在CtaTemplate中增加:合竞价报单、集合竞价撮合和集合竞价结束【连续竞价状态】等状态推送接口,以便推动策略进行委托交易。
  4. 如果策略足够复杂,你还可以考虑集合竞价期间的撤单操作。
  5. 当然4和5两条都需要对目前vnpy的gateway(如CTP网关)、应用引擎(如CtaEngine)和策略模板(如CtaTemplate)进行扩展才能够实现的。

c787297017 wrote:

感谢hxx分享,我模仿您的方法对vnpy_portfoliostrategy和vnpy_tts(gateway)进行了修改,运行起来发现所有的tick都被过滤掉了,检查发现是TickFilter.process_status_event从未被触发过,因此TickFilter.statuses始终为空,这样的话说明EVENT_STATUS从未发生?请问有什么办法能排查某个事件从未发生的原因?

如果你使用的是TTS或者UFT的gateway的话,是不可以的,因为它们都不是完整的CTP柜台,它们都没有实现合约状态推送功能。

小刘小刘 wrote:

hxxjava老哥你好!看了很多你的帖子,你应该是会用CTP订阅大量合约行情的人。请教一个问题,CTP订阅合约一台设备一次订阅多少个合约不会出现堵塞情况,和设备的网络带宽有关系吗?

我的理解:

报单流控、查询流控和会话数控制都是针对交易服务器接口而言的,与行情服务器接口无关。
因为行情是交易所透过网络推送的,行情客户端通过套接字来接收,所以一台设备一次能够订阅的最大合约数量与多个因素有关:

  1. 入户的网络的下行速率及客户端电脑能够分到的流量份额,10M mps,100M bps,1000M bps的网络接入是不可能一样的。
  2. 客户端电脑的硬件配置,想要不堵塞,CPU、内容、硬盘速度太差能够与你的目标想一致。

渔哥 wrote:

能不能直接根据合约的交易时间来过滤?

可以,

  • 目前是判断tick的合约或者品种当前的交易状态是否是有效的,
  • 并没有判断tick的时间戳与当时交易状态中的起止时间是否矛盾,可以加上

这样修改后策略得到的account是整个账户的资金情况。
而实际情况是你可能在跑若干个Cta策略,组合策略,算法交易,期权交易。获取整个账户的资金情况对交易几乎有什么意义。
如果能够为每个策略配置一个逻辑账户,分配一个起始资金,然后根据策略的交易历史,统计该逻辑账户下的资金情况还是非常有意义的。

MTF wrote:

感谢分享!

系统如果引入上面的tick过滤器,BarGenerator的跨日成交量和成交额处理错误问题就可以解决了,详细的解决方法请参见第5节吧。

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。

MTF wrote:

这里是因为每个交易日收盘后系统要重启,所以这么设计的吧

答复:

每个交易日收盘后在下一个盘前系统要重启时,此时收到的第一个tick通常是上一个交易日收盘时最后一个tick,它的成交量是v2。随后进入集合竞价时段,在20:59时会收到一个集合竞价tick,它的成交量是v3。

  1. 当发生了v3-v2<0(图1所示,一般情况下是如此)的情况也是错误的,应该用V3来代替0,因为第一个tick往往是集合竞价的tick,而它的volume代表了在集合竞价时间段发生的成交量,如果用0来替代,则表示把集合竞价的成交量抛弃了,集合竞价期间的4分钟发生的成交量可一定不是个小数目,对吧?
  2. 如果发生了图2所示的情况,此时v3-v2>0(为什么会发生这种情况我已经在帖子里说过),那么volume_change代表什么?它是个无意义的值,直接累计volume_change到1分钟bar中,更是错的离谱,不是吗?

1. 来看看BarGenerator的tick合成1分钟tick过程update_tick()

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            # a: 成交量累计
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)
            # b: 成交额累计
            turnover_change = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        # c: 记录最新tick
        self.last_tick = tick

2. tick的volume和turnover有什么特性?

交易所播报的tick这成交量和成交额是从每个交易日的集合竞价阶段开始累计的,它们都有单调递增的特性。
也就是说volume和turnover在一个交易日内只会增加,不会减少。

2.1 前后两个交易日之间有没有什么关系呢?

答案是没有的!

  • 这是通常情况下的两个交易日成交量的变化示意图:
description
图1

此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3<v2。

  • 特别情况下的两个交易日成交量的变化示意图:
description
图2

此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3>v2。
这种情况会发生吗?会的 !前一天的交易特别清淡,导致成交量非常小,而当前天因为收到什么特别因素刺激,开盘就非常火爆,光是集合竞价的成交量就比前一天的整体的成交量都大,就会出现此图所示的情况。

3. a,b两处代码是错误的

a处代码中的volume_change 是什么?它代表了从上一个tick到当前tick发生的成交量。
1分钟K线到成交量就是对这个volume_change的累计。但是有一个例外,当volume_change < 0时,累加的值就只能够为0了。
那么什么情况下会发生volume_change < 0呢?答案是发生在如图1所示的跨日的情况下!
可是还会发生如图2所示的跨日的情况,此时volume_change > 0,那么volume_change 就不应该是v3-v2,而应该是v3 !
所以a处成交量累计的代码是错误的,同理b处对成交额累计的代码也是错误的!

答复:

前天在你给我私信里,已经给你回复了,你没有看到吗?
这个问题已经修改了,因为我看到你贴出来的代码是原来有问题的代码,往二楼查看下。
需要更新下面的几个class:

  • TradingHours
  • MyBarGenerator

你看不到回复是吗?可能是系统提示我的私信邮箱已经超了,发不了回信吧。

王中锋 wrote:

复现过程中遇到两个报错,解决不了,请大佬指点,拜托!!!
一、加载策略时遇到如下报错:
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\ui\widget.py", line 38, in init
self.cta_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 117, in init_engine
self.load_strategy_setting()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 907, in load_strategy_setting
self.add_strategy(
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 662, in add_strategy strategy: CtaTemplate = strategy_class(self, strategy_name, vt_symbol, setting)
File "C:\Users\Administrator\strategies\demo009_MA20903.py", line 94, in init
self.bg_x = MyBarGenerator(self.on_bar, self.x_min, self.on_x_bar, trading_hours=trading_hours)
File "D:\softwork\python3.10\lib\site-packages\vnpy\usertools\utility.py", line 52, in init
raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")
ValueError: MyBarGenerator need trading hours setting , please check it !

二、回测时遇到如下报错:
Traceback (most recent call last):
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 55, in init
self.backtester_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\engine.py", line 63, in init_engine
self.backtesting_engine = BacktestingEngine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 87, in init
self.load_contracts() # hxxjava add
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 170, in load_contracts
contracts:List[ContractData] = database.load_contract_data()
AttributeError: 'SqliteDatabase' object has no attribute 'load_contract_data'

你可能不仔细按照我的帖子中去做,楼上“龙津”已经用了照着做,代码是没有问题的。你可以私信他,他应该会告诉你经验。

1. 合约信息是一种基础信息,应该在本地持久化保存

策略中使用到交易合约的合约信息是非常困难发生的事情,例如使用其中的合约乘数、最小交易交易单位、最小价格变动等 ... ... 。
如果您的策略中使用到交易合约的合约信息时,当您在休市的时候无法连接Gateway,那么你在回测的时候将无法进行下去。
怎么办,坐等交易所开市吗?本帖就讨论这样一个问题。

2. 解决方法

2.1 合约信息包含哪些内容

vnpy\trader\object.py中ContractData是这样定义的:

@dataclass
class ContractData(BaseData):
    """
    Contract data contains basic information about each contract traded.
    """

    symbol: str
    exchange: Exchange
    name: str
    product: Product
    size: float
    pricetick: float

    open_date:datetime = None       # 上市日    open date   hxxjava add
    expire_date:datetime = None     # 到期日    expire date hxxjava add

    min_volume: float = 1           # minimum trading volume of the contract
    stop_supported: bool = False    # whether server supports stop order
    net_position: bool = False      # whether gateway uses net position volume
    history_data: bool = False      # whether gateway provides bar history data

    option_strike: float = 0
    option_underlying: str = ""     # vt_symbol of underlying contract
    option_type: OptionType = None
    option_listed: datetime = None
    option_expiry: datetime = None
    option_portfolio: str = ""
    option_index: str = ""          # for identifying options with same strike price

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

2.2 修改一下CTP的网关:

修改vnpy_ctp\gateway\ctp_gateway.py中的CtpTdApi接口onRspQryInstrument():

    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """合约查询回报"""
        product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
        if product:
            contract: ContractData = ContractData(
                symbol=data["InstrumentID"],
                exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
                name=data["InstrumentName"],
                product=product,
                size=data["VolumeMultiple"],
                pricetick=data["PriceTick"],
                open_date=datetime.strptime(data["OpenDate"], "%Y%m%d"),        # hxxjava add
                expire_date=datetime.strptime(data["ExpireDate"], "%Y%m%d"),    # hxxjava add
                gateway_name=self.gateway_name
            )

            # 期权相关
            if contract.product == Product.OPTION:
                # 移除郑商所期权产品名称带有的C/P后缀
                if contract.exchange == Exchange.CZCE:
                    contract.option_portfolio = data["ProductID"][:-1]
                else:
                    contract.option_portfolio = data["ProductID"]

                contract.option_underlying = data["UnderlyingInstrID"]
                contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
                contract.option_strike = data["StrikePrice"]
                contract.option_index = str(data["StrikePrice"])
                contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
                contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

            self.gateway.on_contract(contract)

            symbol_contract_map[contract.symbol] = contract

        if last:
            self.contract_inited = True
            self.gateway.write_log("合约信息查询成功")

            for data in self.order_data:
                self.onRtnOrder(data)
            self.order_data.clear()

            for data in self.trade_data:
                self.onRtnTrade(data)
            self.trade_data.clear()

2.3 选择用数据库保存合约信息

在vnpy\trader\database.py中为BaseDatabase增加两个与合约信息相关的接口:

    @abstractmethod
    def save_constract_data(self, constracts: List[ContractData]) -> bool:
        """
        Save constract data into database.    hxxjava add
        """
        pass

    @abstractmethod
    def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
        """
        Load constract data from database.    hxxjava add
        """
        pass

2.4 为MySqlDatabase扩展save_constract_data()和load_contract_data():

本人使用的是MySql Server,所以就在vnpy_mysql\mysql_database.py中扩展这两个接口:

引用部分添加:

from vnpy.trader.constant import Exchange, Interval, Product    # hxxjava add Product

定义合约信息数据表模型

class DbContractData(Model):     # hxxjava add
    """K线数据表映射对象"""

    id = AutoField()

    symbol: str = CharField()
    exchange: str = CharField()
    name : str = CharField()
    product : str = CharField()
    size : float = FloatField()
    pricetick : float = FloatField()

    open_date: datetime = DateTimeField(default=None)
    expire_date: datetime = DateTimeField(default=None)

    min_volume : float = FloatField(default=1)
    stop_supported : bool = BooleanField(default=False)
    net_position : bool = BooleanField(default=False)
    history_data : bool = BooleanField(default=False)

    option_strike : float = FloatField(default=0)
    option_underlying: str = CharField(default="")
    option_type: str = CharField(default="")
    option_listed: datetime = DateTimeField(default=None)
    option_expiry: datetime = DateTimeField(default=None)

    option_portfolio: str = CharField(default="")
    option_index: str = CharField(default="")

    gateway_name:str = CharField()

    class Meta:
        database = db
        indexes = ((("open_date","exchange","symbol"), True),)

修改MysqlDatabase的init(),添加上面两个接口函数:

class MysqlDatabase(BaseDatabase):
    """Mysql数据库接口"""

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbBarOverview])   # hxxjava add DbContractData

    def save_constract_data(self, contracts: List[ContractData]) -> bool:
        """
        Save constract data into database.    hxxjava add
        """
        # 将constracts数据转换为字典
        data = []

        for contract in contracts:
            copy_c = deepcopy(contract)
            d = copy_c.__dict__
            d["exchange"] = d["exchange"].value
            d["product"] = d["product"].value
            d.pop("vt_symbol")
            data.append(d)

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbContractData.insert_many(c).on_conflict_replace().execute()

        return True

    def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
        """
        Load constract data from database.    hxxjava add
        """
        symbol,exchange = "",""
        if vt_symbol:
            symbol,exchange = vt_symbol.split('.')

        s: ModelSelect = DbContractData.select().where(
                (not symbol or DbContractData.symbol == symbol)
                & (not symbol or DbContractData.exchange == exchange)
                ).order_by(DbContractData.open_date,DbContractData.exchange,DbContractData.symbol)

        contracts: List[ContractData] = []
        for db_c in s:
            # 取出四个时间
            open_date = datetime.fromtimestamp(db_c.open_date.timestamp(), DB_TZ)
            expire_date = datetime.fromtimestamp(db_c.expire_date.timestamp(), DB_TZ)

            option_listed = None
            option_expiry = None

            product = Product(db_c.product)
            if product == Product.OPTION:               
                option_listed = datetime.fromtimestamp(db_c.option_listed.timestamp(), DB_TZ)
                option_expiry = datetime.fromtimestamp(db_c.option_expiry.timestamp(), DB_TZ)

            contract = ContractData(
                    symbol = db_c.symbol,
                    exchange = Exchange(db_c.exchange),
                    name = db_c.name,
                    product = Product(db_c.product),
                    size = db_c.size,
                    pricetick = db_c.pricetick,

                    open_date = open_date,
                    expire_date = expire_date,

                    min_volume = db_c.min_volume,
                    stop_supported = db_c.stop_supported,
                    net_position = db_c.net_position,
                    history_data = db_c.history_data,

                    option_strike = db_c.option_strike,
                    option_underlying = db_c.option_underlying,
                    option_type = db_c.option_type,

                    option_listed = option_listed,
                    option_expiry = option_expiry,

                    option_portfolio = db_c.option_portfolio,
                    option_index = db_c.option_index,
                    gateway_name = db_c.gateway_name,
            )
            contracts.append(contract)

        return contracts

2.5 修改OmsEngine,使之能够持久化保存合约信息

修改vnpy\trader\engine.py做如下修改:

2.5.1 添加引用

from threading import Thread            # hxxjava add
from copy import deepcopy               # hxxjava add
from .database import get_database      # hxxjava add

2.5.2 为OmsEngine添加下面函数:

修改OmsEngine的init():

class OmsEngine(BaseEngine):
    """
    Provides order management system function.
    """

    contract_file = "contracts.json"    # hxxjava add

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.quotes: Dict[str, QuoteData] = {}

        self.active_orders: Dict[str, OrderData] = {}
        self.active_quotes: Dict[str, QuoteData] = {}

        self.db = get_database()

        self.load_contracts()   # hxxjava add   启动时就从数据库读取所有合约信息

        self.add_function()
        self.register_event()

在收到账户信息时才保存变化的合约信息。

    def process_account_event(self, event: Event) -> None:
        """"""
        account: AccountData = event.data
        self.accounts[account.vt_accountid] = account

        self.save_contracts_changed()   # hxxjava add

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract: ContractData = event.data
        # self.contracts[contract.vt_symbol] = contract
        # hxxjava change
        if contract.vt_symbol not in self.contracts:
            self.contracts[contract.vt_symbol] = contract
            self.contracts_changed.append(contract)

启动线程对变化的合约进行增量保存,否则会堵塞系统的消息循环。

    def save_contracts_changed(self):
        """ 启动线程保存所有新增的合约 """
        # 复制所有新增的合约
        contracts = [deepcopy(contract) for contract in self.contracts_changed]
        self.contracts_changed = []

        if contracts:
            # 如果有新增的合约,启动线程保存
            t = Thread(target=self.save_contracts,kwargs=({"contracts":contracts}))
            t.start()

    def save_contracts(self,contracts:List):   # hxxjava add
        """ save contracts into database """
        self.db.save_constract_data(contracts)
        print(f"一共保存了{len(contracts)}个合约 !")

    def load_contracts(self):   # hxxjava add
        """ save all contracts into a json file """      
        contracts = self.db.load_contract_data()
        for c in contracts:
            self.contracts[c.vt_symbol] = c

        self.contracts_changed:List[ContractData] = []

        print(f"一共读取了{len(contracts)}个合约 !")
        # print(f"self.contracts={self.contracts}个合约 !")

2.6 为CtaEngine增加获取合约信息函数get_contract()

修改vnpy_ctastrategy\engine.py中的CtaEngine,添加get_contract():

    def get_contract(self, strategy: CtaTemplate) -> Optional[ContractData]:    # hxxjava add
        """
        Get strategy's contract data.
        """
        return self.main_engine.get_contract(strategy.vt_symbol)

2.7 为CTA策略模板添加合约信息获取函数get_contract()

修改vnpy_ctastrategy\template.py中的CtaTemplate,添加get_contract():

    def get_contract(self):
        """
        Return trading_hours of trading contract.   # hxxjava add
        """
        return self.cta_engine.get_contract(self)

3 策略如何使用get_contract()?

经过什么这么复杂的步骤,就赋予了您的CTA策略一种直接读取交易合约的合约信息的能力。
如何使用呢?非常简单,这里给出一段用户策略的例子代码:

        """ 得到交易合约的合约信息 """              
        contract:ContractData = self.get_contract()
        size_n = contract.size   # 合约乘数
        price_tick = contract.price_tick # 最小价格变动

本人这么修改了之后,觉得非常方便,呼吁官方能够把这样的功能合并到系统中,惠及广大vnpy会员。

龙津 wrote:

请注意,如需使用MyBarGenerator进行CTA策略回测,回测脚本调用engine.engine.set_parameters函数的时候不要忘记传入trading_hours参数
回测时,trading_hours参数如何传入,能给出详细的代码吗?

MyBarGenerator进行CTA策略回测的修改已经放在7.1节中给出来,仔细修改下。

所有可自由交易标的行情变动都是随机过程的累积,本质上就是马尔可夫过程,注意:一定要是可自由交易,不信请往下看。

1. 几行代码

import numpy as np
from matplotlib import pyplot as plt

N = 10*600
List1 = list(np.random.randn(N))
data3m = []
data30m = []
tt3m = 0.0

for i in range(N):
    tt3m += List1[i]
    if i % 10==0:
        data30m.append(tt3m)
    data3m.append(tt3m)

plt.figure(figsize=(20,6))
plt.plot(data3m)
plt.show()
plt.figure(figsize=(20,6))
plt.plot(data30m)

2. 某一次的运行结果

description

3. 这段代码是否会为您带来一些思考和某种启示呢?

它所形成的曲线是那么自然,优美,和真实的行情只差一个合约名称 ... ...

老秦 wrote:

真心请教各位,特别是hxxjava,
每次校验tick时,直接用最新的合约状态校验不行吗?为啥还涉及curr_status, next_status?
每次收到推送的状态,就更新最新合约状态,据此检验tick,这样不行吗?没太明白您的curr_status, next_status是啥意思,为何会有两个状态。

答复:

  • 您的想法是对的,不用curr_status, next_status,这样也是可以的
  • 有没有想过,curr_status, next_status可以帮你提前知道下一个交易状态将发生在什么时间,只要你连续交易一个完整的交易日,你就已经有了它完整的交易时间段信息了
  • 当然主要做也可以有问题的,加入有一天因为某种原因(涨停或跌停)临时休市3分钟,这个信息json文件后,会影响下一个交易日的对next_status状态判断,需要手动修改json文件
  • 稳妥点方法是只有使用接口实时报告的状态信息。
© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】