VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

下载zeal

zeal的官网网址:https://zealdocs.org/
description

选择属于自己需要的平台

Windows,Linux和BSD各种平台,选择属于自己需要的平台。我选择的是免安装的Portable的压缩包:

description

windows平台的Portable压缩包下载之后无需安装,所以设置和文档都存放在应用目录下,比较方便。下载之后直接解压到指定目录下,找到运行其中zeal.exe就可以了。

运行zeal

设置离线文档存放目录

description

下载离线文档

description

已经下载的离线文档

description

打开离线文档,查询感兴趣的主题

浏览离线文档既可以在zeal内部浏览器显示感兴趣的主题,也可以选择外边浏览器。至此你再也不用为查询python语法、函数解释和用法而烦恼了。

description

1、vnpy系统缺少音乐和语音播放功能

vnpy中多采用各种应用系统的策略进行交易的,虽然也有各种日志和提示出现,但平常总是静悄悄的。
如果你想了解系统和策略的运行情况,可以查看各种运行日志,例如MainWindow的日志,委托列表,成交的列表,账户列表。想要查询你的策略运行情况,可以查看你的策略管理器的变量输出,等等。可是人总不能一直盯着屏幕,那样太累了。
如果能够有个声音和语音播报各种交易活动,用户会及时得到提醒。例如:

  • 当网络连接和断开时可以提醒特定的音乐或语音,可以让你及时处理网络故障;
  • 用户策略中可以添加音乐或语音,可以提醒策略的交易所发生的交易活动是否正常;
  • 当你的账户资金出现保证金不足,可以提醒及时入金;
  • 如果逆勢日内交易,你可以设置提前3分钟提醒你即将收盘了。

2、实现方法

2.1 音乐和语音播放器的实现

在vnpy\usertools目录下添加文件sound_player.py,内容如下:

"""
多线程音乐和文本播放器,介绍如下:
特点:
    既可以播放wav,mp3 等格式,有可以对文本进行播放。
    使用消息引擎创建该音乐播放器,多线程并行播放声音,不会因为播放声音而阻塞业务流程。
    假设其他应用或者策略中需要播放声音, sound_name为字符型的声音文件名称,使用方法有两种:   
    方法1:  先获取消息引擎event_engine(注:与SoundPlayer实例成交时使用的消息引擎是
            相同的),那么可以这样播放:

            event_engine.put(Event(EVENT_SOUND,"Connected.wav"))
            event_engine.put(Event(EVENT_SPEEK,"您收到一条委托单"))

    方法2:  将SoundPlayer多play_sound()接口安装到MainEngine到实例main_engine,那么
            可以先回去获取main_engine,然后这样播放:

            event_engine.play_sound("Connected.wav")
            event_engine.speek_text("您收到一条委托单")

作者:hxxjava    时间:2023-2-14,情人节————献给心爱的人!

修改:增加回测与实盘的区分功能,使得只在实盘环境才播放声音和文本。
修改:hxxjava    时间:2023-2-28

依赖库:pyttsx3, 安装:pip install pyttsx3
"""
from typing import Any
from pathlib import Path
from threading import Thread
from vnpy.trader.engine import EventEngine,Event
from winsound import PlaySound,SND_FILENAME
import pyttsx3

EVENT_SOUND = "eSound."
EVENT_SPEEK = "eSpeak."

class SoundPlayer():
    """
    多线程声音播放器
    """
    def __new__(cls, *args, **kwargs):
        """ singleton constructor """
        if not hasattr(cls, "_instance"):
            cls._instance = super(SoundPlayer, cls).__new__(cls)
        return cls._instance

    def __init__(self,event_engine:EventEngine,switch:bool=True):
        """ 初始化函数 """
        self.event_engine = event_engine
        # control play sound file
        self.switch = switch   
        self.register_event()

    def register_event(self):
        """ """
        self.event_engine.register(EVENT_SOUND,self.process_sound_event)
        self.event_engine.register(EVENT_SPEEK,self.process_speak_event)

    def set_switch(self,switch:bool=True):
        """ set the swith which control play sound file """
        self.switch = switch

    def _get_sound_path(self,sound_name: str):
        """
        Get path for sound file with sound name.
        """
        this_file_path:Path = Path(__file__).parent
        sound_path:Path = this_file_path.joinpath("sounds", sound_name)
        return str(sound_path)

    def process_sound_event(self,event:Event):
        """ EVENT消息处理过程 """
        wavname,is_testing = event.data['wavname'],event.data['is_testing']
        if self.switch == True and is_testing == False:
            filename = self._get_sound_path(wavname) 
            thread = Thread(target=self._play_sound,kwargs=({"filename":filename}),daemon=True)     
            thread.start()  

    def process_speak_event(self,event:Event):
        """ EVENT消息处理过程 """
        santence,is_testing = event.data['santence'],event.data['is_testing']
        if self.switch == True and is_testing == False:
            santence:str = event.data
            thread = Thread(target=self._do_speak,kwargs=({"santence":santence}),daemon=True)     
            thread.start()  

    def _play_sound(self,filename:str):
        """ 音乐文件播放线程执行过程 """
        PlaySound(filename,SND_FILENAME)

    def _do_speak(self,santence:str):
        """ 文本播放线程执行过程 """
        print(santence)
        speaker = pyttsx3.init()
        speaker.say(santence)
        speaker.runAndWait()

    def play_sound(self,sound_name:str,is_testing:bool=False):
        """ 
        用户音乐播放接口。
        参数:
            sound_name:传入声音文件名 
            is_testing:回测=True;实盘=False(默认)
        """
        self.event_engine.put(Event(EVENT_SOUND,{"wavname":sound_name,"is_testing":is_testing}))

    def speak_text(self,santence:str,is_testing:bool=False):
        """ 
        用户文字播放接口。
        参数:
            santence:传入声音文件名 
            is_testing:回测=True;实盘=False(默认)
        """
        self.event_engine.put(Event(EVENT_SPEEK,{"santence":santence,"is_testing":is_testing}))

2.2 把音乐和语音播放器安装到vnpy系统

在vnpy\trader\engine.py中做如下修改:

1)在引用部分添加这些内容

from vnpy.usertools.sound_player import SoundPlayer

2)在class OmsEngine的init()中创建音乐和语音播放器

      self.sound_player = SoundPlayer(event_engine,True)   # test sound player

3)在class OmsEngine的add_function()函数中为MainEngine添加下面的函数

    def add_function(self) -> None:
        """Add query function to main engine."""
         ... ...
        self.main_engine.play_sound = self.sound_player.play_sound
        self.main_engine.speak_text = self.sound_player.speak_text

这样你的MainEngine就有了可以音乐和语音功能了。

2.3 音频文件存放在哪里?

class sound_player规定音频文件存放在vnpy\usertools\sounds\目录下,当然你也可以修改代码中规定的目录,放在自己喜欢的目录下。
文件可以是wav、mp3格式的音乐文件均可,可以自己录制。
取一些有意义的文件名,如connected.wav代表网络连接成功,disconnection.wav代表网络断开,自己发挥吧,方便自己在自己vnpy系统中用函数调用。
本来本人有一套音乐文件的,可是论坛里没有文件上传功能,所以无法共享给大家,如果需要可以私信我。

3. 如何使用音乐和语音播放功能

下面用连接网关成功和连接断开,分别给出音乐和语音播放的示例:

3.1 音乐播放功能使用

3.1.1 通过main_engine调用play_sound()播放语音

    def process_connect_event(self, event: Event) -> None:  # hxxjava add
        """ CTP接口连接消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.play_sound("Connected.wav")

    def process_disconnect_event(self, event: Event) -> None: # hxxjava add
        """ CTP接口断开消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.play_sound("ConnectionLost.wav")

3.1.2 发送EVENT_SOUND消息播放音乐

# 增加引用
from vnpy.usertools.sound_player import EVENT_SOUND,EVENT_SPEEK

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        # 当策略收到委托单时播放提示音乐
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SOUND,
                      {"wavname":"order.wav","is_testing":is_testing}
                )
        event_engine.put(event)

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        # 当策略收到成交单时播放提示音乐
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SOUND, {"wavname":"traded.wav","is_testing":is_testing} )
        event_engine.put(event)

3.2 语音播放功能使用

3.2.1 通过main_engine调用speak_text()播放语音

    def process_connect_event(self, event: Event) -> None:  # hxxjava add
        """ CTP接口连接消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.speak_text(f"感谢您,连接{gateway.name}的{gateway.type}接口成功!")

    def process_disconnect_event(self, event: Event) -> None: # hxxjava add
        """ CTP接口断开消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.speak_text(f"请注意:{gateway.name}的{gateway.type}接口已断开!")

3.2.2 发送EVENT_SPEEK消息播放语音

假设你的策略中实现了on_order()和on_trade()这两个回调函数:

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SPEEK,
                      {"santance":f"策略{self.strategy_name}收到委托单,价格{order.price},手数{order.volume},已经成交{order.traded}",
                       "is_testing":is_testing})
        event_engine.put(event)

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SPEEK,
                      {"santance":f"策略{self.strategy_name}收到{trade.vt_symbol}成交单,成交价{trade.price}手数{trade.volume}.",
                       "is_testing":is_testing})
        event_engine.put(event)

3.3 音乐和语音播放功能使用注意事项

实盘中用户策略是可以通过应用应用引擎获得vnpy系统的MainEngine的,这样就可以使用 play_sound() 和 speak_text()函数来播放音乐和语音了。但是,在策略中使用这个两个播放函数,应该考虑到回测时不要有声音的。应该根据应用引擎的不同,在策略中使用 play_sound() 和 speak_text()时,将参数is_testing设置为True,这样策略回测就不会有音乐和语音了。

4 将音乐和语音播放功能封装到CTA策略中

这里以CTA策略模板CtaTemplate为例,演示如何将音乐和语音播放功能封装到各种应用的策略中,其他应用系统的模板可以参考以下的做法去封装,就不再一一讲解。

# 在引用部分增加对音乐和语音播器的引用
from vnpy.usertools.sound_player import SoundPlayer

class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""

        ... ... # 原来的初始化代码 

        # 音乐和语音播放器 hxxjava add
        self.sound_player:SoundPlayer = SoundPlayer(self.cta_engine.event_engine)

    def play_sound(self,sound_name:str): 
        """ 播放音乐 hxxjava add """
        if self.cta_engine.get_engine_type() == EngineType.LIVE:
            self.sound_player.play_sound(sound_name)

    def speak_text(self,santence:str): 
        """ 播放语音 hxxjava add """
        if self.cta_engine.get_engine_type() == EngineType.LIVE:
            self.sound_player.speak_text(santence)

经过上面对CtaTemplate的修改,用户策略中就可以像下面的语句一样直接调用音乐和语音播放了,更加简便。

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化开始")
        self.load_bar(10)
        self.speak_text(f"策略{self.strategy_name}开始初始化")

1. 停止单、条件单存在的问题

CTA策略模块原来是有停止单的,本人后来又在添加了条件单功能。使用中很多用户反应有这样的问题,那就是策略已经发出过停止单或条件单,但是还未触发,但是因为某种原因策略被关机了,再次启动该策略时发现之前发出过停止单或条件单没有了,非常不方便。如果能够在策略再次启动的时候,把历史的停止单或条件单回复出来就好了。

2. 如何解决加载停止单、条件单

那么如何把历史的停止单或条件单回复出来呢?把策略运行时曾经发出的停止单或条件单保存到文件或者数据库,在策略再次启动时,从文件或者数据库读取出来,恢复到CTA策略管理器的停止单或条件单列表,让它们继续运行就可以了。
这就有选择的问题:

  1. 是否希望加载历史停止单或条件单,这是可以选择的,应该可以设置;
  2. 全部恢复历史停止单或条件单,还是止恢复仍然有效的,这也应该可以选择的;

3. 实现方法

3.1 先实现停止单字典和条件单字典的存取功能

包括如下:

  • 保存内存中的停止单字典到json文件
  • 从json文件读取停止单字典
  • 保存内存中的条件单字典到json文件
  • 从json文件读取条件单字典

在vnpy_ctastrategy命令下新建一个文件utitlity.py,其内容如下:

"""
实现停止单字典和条件单字典的存取功能,功能如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
作者:hxxjava       
时间:2023-2-12
"""
import json
from datetime import datetime
from vnpy.trader.constant import Direction,Offset
from vnpy_ctastrategy.base import (
    StopOrder,
    StopOrderStatus,
    ConditionOrder,
    Condition,
    ExecutePrice,
    CondOrderStatus,
)
from vnpy.trader.utility import get_file_path,get_folder_path,save_json

class StopOrderEncoder(json.JSONEncoder):
    """
    停止单相关类型的编码器————用来保存json文件
    """
    def default(self, obj):
        d = {}
        d['__class__'] = obj.__class__.__name__
        if isinstance(obj,Direction):
            d['_value_'] = obj.value
        elif isinstance(obj,Offset):
            d['_value_'] = obj.value
        elif isinstance(obj,StopOrderStatus):
            d['_value_'] = obj.value
        elif isinstance(obj, datetime):
            d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
        elif isinstance(obj,StopOrder):
            d.update(obj.__dict__)
        else:
            d['__module__'] = obj.__module__
            d.update(obj.__dict__)            
        return d

class StopOrderDecoder(json.JSONDecoder):
    """
    停止单相关类型的译码器————用来从json文件读取
    """
    def __init__(self):
        json.JSONDecoder.__init__(self, object_hook=self.dict2obj)

    def dict2obj(self, d):
        if '__class__' in d:       
            class_name = d.pop('__class__')
            if class_name == 'Direction':
                value = d['_value_']
                instance = Direction(value)
            elif class_name == 'Offset':
                value = d['_value_']
                instance = Offset(value)
            elif class_name == 'StopOrderStatus':
                value = d['_value_']
                instance = StopOrderStatus(value)
            elif class_name == 'datetime':
                value = d['_value_']
                instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
            elif class_name == 'StopOrder':
                instance = StopOrder(**d)
            else:    
                module_name = d.pop('__module__')
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                args = dict((key,value) for key, value in d.items())
                instance = class_(**args)
        else:

            instance = d
        return instance

class CondOrderEncoder(json.JSONEncoder):
    """
    条件单相关类型的编码器————用来保存json文件
    """
    def default(self, obj):
        d = {}
        d['__class__'] = obj.__class__.__name__
        if isinstance(obj,Direction):
            d['_value_'] = obj.value
        elif isinstance(obj,Offset):
            d['_value_'] = obj.value
        elif isinstance(obj,Condition):
            d['_value_'] = obj.value
        elif isinstance(obj,ExecutePrice):
            d['_value_'] = obj.value
        elif isinstance(obj,CondOrderStatus):
            d['_value_'] = obj.value
        elif isinstance(obj, datetime):
            d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
        elif isinstance(obj,ConditionOrder):
            d.update(obj.__dict__)
        else:
            d['__module__'] = obj.__module__
            d.update(obj.__dict__)            
        return d

class CondOrderDecoder(json.JSONDecoder):
    """
    条件单相关类型的译码器————用来从json文件读取
    """
    def __init__(self):
        json.JSONDecoder.__init__(self, object_hook=self.dict2obj)

    def dict2obj(self, d):
        if '__class__' in d:       
            class_name = d.pop('__class__')
            if class_name == 'Direction':
                value = d['_value_']
                instance = Direction(value)
            elif class_name == 'Offset':
                value = d['_value_']
                instance = Offset(value)
            elif class_name == 'Condition':
                value = d['_value_']
                instance = Condition(value)
            elif class_name == 'ExecutePrice':
                value = d['_value_']
                instance = ExecutePrice(value)
            elif class_name == 'CondOrderStatus':
                value = d['_value_']
                instance = CondOrderStatus(value)
            elif class_name == 'datetime':
                value = d['_value_']
                instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
            elif class_name == 'ConditionOrder':
                instance = ConditionOrder(**d)
            else:    
                module_name = d.pop('__module__')
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                args = dict((key,value) for key, value in d.items())
                instance = class_(**args)
        else:

            instance = d
        return instance

def save_stop_order(filename: str,data: dict) -> None:
    """
    Save StopOrder dict into {.vntrader}\stop_orders\{filename}.json.
    """
    path = get_folder_path("stop_orders")
    path_file = path.joinpath(filename)
    with open(path_file, mode="w+",encoding="UTF-8") as f:
        json.dump(
            data,
            f,
            indent=4,
            cls=StopOrderEncoder,
            ensure_ascii=False
        )

def load_stop_order(filename: str) -> dict:
    """
    Load StopOrder dict from {.vntrader}\stop_orders\{filename}.json.
    """
    path = get_folder_path("stop_orders")
    path_file = path.joinpath(filename)
    filepath: Path = get_file_path(path_file)
    if filepath.exists():
        with open(filepath, mode="r", encoding="UTF-8") as f:
            data: dict = json.load(f,cls=StopOrderDecoder)
        return data
    else:
        save_json(filepath, {})
        return {}

def save_condition_order(filename: str,data: dict) -> None:
    """
    Save ConditionOrder dict into {.vntrader}\cond_orders\{filename}.json.
    """
    path = get_folder_path("cond_orders")
    path_file = path.joinpath(filename)
    with open(path_file, mode="w+",encoding="UTF-8") as f:
        json.dump(
            data,
            f,
            indent=4,
            cls=CondOrderEncoder,
            ensure_ascii=False
        )

def load_condition_order(filename: str) -> dict:
    """
    Load ConditionOrder dict from {.vntrader}\cond_orders\{filename}.json.
    """
    path = get_folder_path("cond_orders")
    path_file = path.joinpath(filename)
    filepath: Path = get_file_path(path_file)
    if filepath.exists():
        with open(filepath, mode="r", encoding="UTF-8") as f:
            data: dict = json.load(f,cls=CondOrderDecoder)
        return data
    else:
        save_json(filepath, {})
        return {}

3.2 为CtaEngine增加停止单和条件单保存和加载功能

在文件cta_strategy\engine.py的class MyCtaEngine下面增加下面的代码。class MyCtaEngine已经在比停止单更好用的条件单——ConditionOrder一文中分享给大家来,虽然这次贴出其完整代码,但这里只介绍与停止单和条件单的保存与恢复有关的内容。

  • 增加了save_stop_orders()接口,用来保存策略停止单
  • 增加了load_stop_orders()接口,用来回复策略停止单
  • 增加了save_condition_orders()接口,用来保存策略条件单
  • 增加了load_condition_orders()接口,用来回复策略条件单
  • 增加了对EVENT_TIMER消息的订阅,其消息处理函数process_timer_event()用来定时对所以初始化的策略进行监视,保存其停止单和条件单到以其策略名称为文件名的json文件。

在cta_strategy\engine.py的引用部分增加这些代码:

from .utility import save_stop_order, load_stop_order, save_condition_order, load_condition_order   # hxxjava add

class MyCtaEngine下面增加下面的代码:

class MyCtaEngine(CtaEngine):
    """ 
    CTA策略引擎,对CtaEngine的功能进行扩展。 
    功能:
    1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
    2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
    3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
    4. 定时保存已经初始化策略的停止单和条件单到json文件。
    5. 提供历史策略的停止单和条件单的查询接口。
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

        self.seconds = 0
        self.save_orders_interval = 10

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
        self.event_engine.register(EVENT_STATUS, self.process_status_event)
        self.event_engine.register(EVENT_ALL_PENDING_ORDER, self.process_pending_order_event)
        self.event_engine.register(EVENT_TIMER, self.process_timer_event)

    def process_pending_order_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """
        pending_orders:PendingOrders = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map.get(pending_orders.vt_symbol,[])
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行全挂单消息推送
                self.call_strategy_func(strategy, strategy.on_pending_orders, pending_orders)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """
        tick:TickData = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map.get(tick.vt_symbol,[])
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_status_event(self,event:Event):
        """ 交易状态消息处理 hxxjava add """
        status:StatusData = event.data
        strategies:List[CtaTemplate] = []

        # step1: find strategies related to this status data 
        vt_instrument0 = get_vt_instrument(status.vt_symbol)
        if vt_instrument0 == status.vt_symbol:
            # 交易品种的交易状态
            for vt_symbol in self.symbol_strategy_map.keys():
                vt_instrument = get_vt_instrument(vt_symbol)
                if vt_instrument == vt_instrument0:
                    # 交易品种的交易状态属于策略交易的合约
                    strategies.extend(self.symbol_strategy_map[vt_symbol]) 

        else:
            # 单独合约的交易状态
            strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))

        if not strategies:
            return

        # step 2: push status data to all relate strategies
        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_status, status)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.put_cond_order_event(order)

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.put_cond_order_event(order)
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.put_cond_order_event(order)

        return True

    def put_cond_order_event(self, cond_order: ConditionOrder) -> None:
        """
        Put an event to update condition order status.
        """
        event: Event = Event(EVENT_CONDITION_ORDER, cond_order)
        self.event_engine.put(event)

    def save_stop_orders(self,strategy_name:str,active_only:bool=False):
        """ 保存加载历史停止单到json文件 """
        count:int = 0
        strategy:CtaTemplate = self.strategies.get(strategy_name,None)
        if not strategy:
            return count

        stop_orders = {}
        for order_id,stop_order in self.stop_orders.items():
            if stop_order.strategy_name == strategy_name:
                if active_only and stop_order.status == StopOrderStatus.WAITING:
                    stop_orders[order_id] = stop_order
                    count += 1
                else:
                    stop_orders[order_id] = stop_order
                    count += 1
        file_name = f"{strategy_name}.json"
        save_stop_order(file_name,stop_orders)
        return count

    def load_stop_orders(self,strategy_name:str,active_only:bool=True):
        """ 从json文件加载历史停止单 """
        file_name = f"{strategy_name}.json"
        stop_orders:Dict[str,StopOrder] = load_stop_order(file_name)
        if not stop_orders:
            return

        if not active_only:
            loaded_orders = stop_orders
        else:
            loaded_orders = {}
            for id,stop_order in stop_orders.items():
                if stop_order.status == StopOrderStatus.WAITING:
                    loaded_orders[id] = stop_order

        if loaded_orders:
            self.stop_orders.update(loaded_orders)
            # 更新GUI中加载的停止单列表
            for stop_order in loaded_orders.values():
                self.put_stop_order_event(stop_order)

    def save_condition_orders(self,strategy_name:str,active_only:bool=True):
        """ 保存加载历史条件单到json文件 """
        count:int = 0
        strategy:CtaTemplate = self.strategies.get(strategy_name,None)
        if not strategy:
            return count

        cond_orders = {}
        for order_id,cond_order in self.condition_orders.items():
            if cond_order.strategy_name == strategy_name:
                if active_only and cond_order.status == CondOrderStatus.WAITING:
                    cond_orders[order_id] = cond_order
                    count += 1
                else:
                    cond_orders[order_id] = cond_order
                    count += 1
        file_name = f"{strategy_name}.json"
        save_condition_order(file_name,cond_orders)
        return count

    def load_condition_orders(self,strategy_name:str,active_only:bool=True):
        """ 从json文件加载历史条件单 """
        file_name = f"{strategy_name}.json"
        cond_orders:Dict[str,ConditionOrder] = load_condition_order(file_name)
        if not cond_orders:
            return

        if not active_only:
            loaded_orders = cond_orders
        else:
            loaded_orders = {}
            for id,cond_order in cond_orders.items():
                if cond_order.status == CondOrderStatus.WAITING:
                    loaded_orders[id] = cond_order

        if loaded_orders:
            self.condition_orders.update(loaded_orders)
            # 更新GUI中加载的条件单列表
            for cond_order in loaded_orders.values():
                self.put_cond_order_event(cond_order)

    def process_timer_event(self,event:Event) -> None:
        # 定时保存策略的
        self.seconds += 1
        if self.seconds % self.save_orders_interval:
            return

        if self.get_engine_type() != EngineType.LIVE:
            # 只有实盘引擎才保存停止单和条件单,回测引擎则不保存
            return

        for strategy in self.strategies.values():
            if strategy.inited:
                cnt1 = self.save_stop_orders(strategy.strategy_name)
                cnt2 = self.save_condition_orders(strategy.strategy_name)
                # print(f"保存了策略 {strategy.strategy_name} 的 {cnt1} 个停止单,{cnt2} 个条件。")

3.3 为CTA策略模板CtaTemplate增加停止单和条件单加载选项

这里主要介绍在CtaTemplate中增加点与加载历史停止单和条件单相关的成员变量:
其中:

  • self.history_order表示策略是否在启动之后加载历史停止单和条件单,
  • self.active_only表示是否只加载仍然处于等待状态的历史停止单和条件单;

class CtaTemplate的其他代码见本人之前的帖子中的代码:比停止单更好用的条件单——ConditionOrder

class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""
        self.cta_engine: Any = cta_engine
        self.strategy_name: str = strategy_name
        self.vt_symbol: str = vt_symbol

        self.inited: bool = False
        self.trading: bool = False
        self.pos: int = 0

        # 是否在启动之后加载历史停止单和条件单
        self.history_order:bool = False  # hxxjava add
        self.active_only:bool = False  # hxxjava add

        # Copy a new variables list here to avoid duplicate insert when multiple
        # strategy instances are created with the same strategy class.
        self.variables = copy(self.variables)
        self.variables.insert(0, "inited")
        self.variables.insert(1, "trading")
        self.variables.insert(2, "pos")

        self.update_setting(setting)

       # 其他代码省略 ... ...

4. 加载停止单和条件单委托选项的应用

4.1 用户CTA策略如何使用停止单和条件单加载功能?

增加历史停止单和条件单委托选项

class DemoStrategy(CtaTemplate):
    """ 示例策略 """

    author = "hxxjava"

    capital : float = 200000.0  # 交易资金
    max_loss_ratio : int = 6         # 每次开仓最大亏损比例
    max_open_times : int = 3         # 每次开仓最大亏损比例

    dir_interval : str = '1m'   # 方向周期单位,只能够是:'1m','1h','d'或'w'中的一个
    dir_window : int = 30       # 方向周期窗口
    op_interval : str = '1m'    # 操作周期单位,只能够是:'1m','1h','d'或'w'中的一个
    op_window : int = 3         # 操作周期窗口
    load_days : int = 10        # 加载历史行情的天数
    OpenSelect:str = "逆转价"
    show_chart: bool = True     # 是否需要显示图表
    dir_trend: str = ""
    op_trend: str = ""


    parameters = [
        "capital",     
        "max_loss_ratio",   
        "max_open_times",   
        "dir_interval",
        "dir_window",
        "op_interval",
        "op_window",
        "load_days",
        "OpenSelect",
        "show_chart",
        "history_order",    # 启动时是否加载停止单和条件单选项
        "active_only",       # 是否只加载仍然有效的停止单和条件单选项,
    ]

    long_pos:float = 0  # 持有多仓
    short_pos:float = 0 # 持有空仓

    variables = [
        "dir_trend",
        "op_trend",
        "long_pos",
        "short_pos"
    ]

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)

        self.history_order = True  # 启动时载历史停止单和条件单
        self.active_only = False  # 加载又有点历史停止单和条件单

       # 其他的代码省略
    def on_start(self):
        """
        Callback when strategy is started.
        """
        is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
        if is_live and self.history_order:
            # 只有实盘才保存策略的历史停止单和条件单
            from vnpy_ctastrategy.engine import MyCtaEngine
            cta_engine:MyCtaEngine = self.cta_engine
            cta_engine.load_stop_orders(self.strategy_name,self.active_only)
            cta_engine.load_condition_orders(self.strategy_name,self.active_only)
            self.write_log("加载历史停止单和条件单已执行。")

    # 其他的代码省略

4.2 设置加载停止单和条件单委托选项

在用户策略未启动的情况下,还可以设置有关加载停止单和条件单委托的选项,如图所示:

description

4.3 停止单和条件单会在用户CTA策略启动之后立即加载

下图是策略执行了条件单之后被关闭,再次重新启动之后回复的条件单。当然,停止单也是可以的实现恢复历史的,大家可以去试。
description

4.4 保存的停止单和条件单在哪里,内容是什么样子?

以条件单为例,如下图所示,条件单通常保存在用户目录下的.vntrader\cond_orders{策略名称}.json文件中:

description

其内容及格式如下:

{
    "0213082700975": {
        "__class__": "ConditionOrder",
        "strategy_name": "gs-rb2305",
        "vt_symbol": "rb2305.SHFE",
        "direction": {
            "__class__": "Direction",
            "_value_": "多"
        },
        "offset": {
            "__class__": "Offset",
            "_value_": "开"
        },
        "price": 4072.0,
        "volume": 4.0,
        "condition": {
            "__class__": "Condition",
            "_value_": ">"
        },
        "execute_price": {
            "__class__": "ExecutePrice",
            "_value_": "设定价"
        },
        "create_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 08:27:00.975422"
        },
        "trigger_time": null,
        "cond_orderid": "0213082700975",
        "traded": 0.0,
        "vt_orderids": [],
        "status": {
            "__class__": "CondOrderStatus",
            "_value_": "已撤销"
        },
        "before_trigger": null,
        "after_traded": null
    },
    "0213090325941": {
        "__class__": "ConditionOrder",
        "strategy_name": "gs-rb2305",
        "vt_symbol": "rb2305.SHFE",
        "direction": {
            "__class__": "Direction",
            "_value_": "空"
        },
        "offset": {
            "__class__": "Offset",
            "_value_": "开"
        },
        "price": 4063.0,
        "volume": 4.0,
        "condition": {
            "__class__": "Condition",
            "_value_": "<="
        },
        "execute_price": {
            "__class__": "ExecutePrice",
            "_value_": "极限价"
        },
        "create_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 09:03:25.941102"
        },
        "trigger_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 09:03:25.500000"
        },
        "cond_orderid": "0213090325941",
        "traded": 0.0,
        "vt_orderids": [
            "CTP.12_-2076558284_1"
        ],
        "status": {
            "__class__": "CondOrderStatus",
            "_value_": "已触发"
        },
        "before_trigger": null,
        "after_traded": null
    }
}

1. 为什么要隐藏与显示K线图表的信息板?

K线图表中主图与副图都有信息板,当我们用选择某个K线时,它用文字的形式表达每个K线修改的信息以及指标数值,它很有用。但是有时因为主图或者副图的指标过多,抑或是显示的信息过多,它会覆盖主图或副图的左上角或者右上角开始的很大一片区域,很影响我们的观看效果,尤其是遇到下跌行情时,K线从主图的左上角开始,到主图的右下角结束,那么这个时候你无论如何也看不到左上角的K线,因为它们被信息板遮住了。怎么办?隐藏信息板是个好办法。

2. 实现方法

修改vnpy\chart\widget.py中的class ChartCursor,代码如下:

class ChartCursor(QtCore.QObject):
    """"""

    def __init__(
        self,
        widget: ChartWidget,
        manager: BarManager,
        plots: Dict[str, pg.GraphicsObject],
        item_plot_map: Dict[ChartItem, pg.GraphicsObject]
    ) -> None:
        """"""
        super().__init__()

        self._widget: ChartWidget = widget
        self._manager: BarManager = manager
        self._plots: Dict[str, pg.GraphicsObject] = plots
        self._item_plot_map: Dict[ChartItem, pg.GraphicsObject] = item_plot_map

        self._x: int = 0
        self._y: int = 0
        self._plot_name: str = ""

        self._info_visibles:dict[str,bool] = {} # hxxjava add 2023-2-10

        self._init_ui()
        self._connect_signal()  

    def _init_ui(self) -> None:
        """"""
        self._init_line()
        self._init_label()
        self._init_info()

    def _init_line(self) -> None:
        """
        Create line objects.
        """
        self._v_lines: Dict[str, pg.InfiniteLine] = {}
        self._h_lines: Dict[str, pg.InfiniteLine] = {}
        self._views: Dict[str, pg.ViewBox] = {}

        pen: QtGui.QPen = pg.mkPen(WHITE_COLOR)

        for plot_name, plot in self._plots.items():
            v_line: pg.InfiniteLine = pg.InfiniteLine(angle=90, movable=False, pen=pen)
            h_line: pg.InfiniteLine = pg.InfiniteLine(angle=0, movable=False, pen=pen)
            view: pg.ViewBox = plot.getViewBox()

            for line in [v_line, h_line]:
                line.setZValue(0)
                line.hide()
                view.addItem(line)

            self._v_lines[plot_name] = v_line
            self._h_lines[plot_name] = h_line
            self._views[plot_name] = view

    def _init_label(self) -> None:
        """
        Create label objects on axis.
        """
        self._y_labels: Dict[str, pg.TextItem] = {}
        for plot_name, plot in self._plots.items():
            label: pg.TextItem = pg.TextItem(
                plot_name, fill=CURSOR_COLOR, color=BLACK_COLOR)
            label.hide()
            label.setZValue(2)
            label.setFont(NORMAL_FONT)
            plot.addItem(label, ignoreBounds=True)
            self._y_labels[plot_name] = label

        self._x_label: pg.TextItem = pg.TextItem(
            "datetime", fill=CURSOR_COLOR, color=BLACK_COLOR)
        self._x_label.hide()
        self._x_label.setZValue(2)
        self._x_label.setFont(NORMAL_FONT)
        plot.addItem(self._x_label, ignoreBounds=True)

    def _init_info(self) -> None:
        """
        """
        self._infos: Dict[str, pg.TextItem] = {}
        for plot_name, plot in self._plots.items():
            info: pg.TextItem = pg.TextItem(
                "info",
                color=CURSOR_COLOR,
                border=CURSOR_COLOR,
                fill=BLACK_COLOR
            )
            info.hide()
            info.setZValue(2)
            info.setFont(NORMAL_FONT)
            plot.addItem(info)  # , ignoreBounds=True)
            self._infos[plot_name] = info

    def _connect_signal(self) -> None:
        """
        Connect mouse move signal to update function.
        """
        self._widget.scene().sigMouseMoved.connect(self._mouse_moved)
        self._widget.scene().sigMouseClicked.connect(self._mouse_clicked) # hxxjava add 2023-2-10

    def isInfoVisible(self,plot_name:str):
        """ 获取信息板的隐显 hxxjava add 2023-2-10 """
        if plot_name not in self._info_visibles:
            self._info_visibles[plot_name] = True
        return self._info_visibles[plot_name]

    def setInfoVisible(self,plot_name:str,visible:bool):
        """ 设置信息板隐显 hxxjava add 2023-2-10 """
        self._info_visibles[plot_name] = visible    

    def _mouse_clicked(self,evt): # hxxjava add 2023-2-10
        """ 用鼠标左键+CTRL键隐显信息板 2023-2-10 """
        button = evt.button()
        modifiers = evt.modifiers()
        if button == QtCore.Qt.LeftButton and modifiers == QtCore.Qt.ControlModifier:
            if self._plot_name in self._infos:
                text_info = self._infos.get(self._plot_name,None)  
                old_value = self.isInfoVisible(self._plot_name)    
                self.setInfoVisible(self._plot_name,not old_value)      
                text_info.setVisible(not old_value)

    def _mouse_moved(self, evt: tuple) -> None:
        """
        Callback function when mouse is moved.
        """
        if not self._manager.get_count():
            return

        # First get current mouse point
        pos: tuple = evt

        for plot_name, view in self._views.items():
            rect = view.sceneBoundingRect()

            if rect.contains(pos):
                mouse_point = view.mapSceneToView(pos)
                self._x = to_int(mouse_point.x())
                self._y = mouse_point.y()
                self._plot_name = plot_name
                break

        # Then update cursor component
        self._update_line()
        self._update_label()
        self.update_info()

    def _update_line(self) -> None:
        """"""
        for v_line in self._v_lines.values():
            v_line.setPos(self._x)
            v_line.show()

        for plot_name, h_line in self._h_lines.items():
            if plot_name == self._plot_name:
                h_line.setPos(self._y)
                h_line.show()
            else:
                h_line.hide()

    def _update_label(self) -> None:
        """"""
        bottom_plot: pg.PlotItem = list(self._plots.values())[-1]
        axis_width = bottom_plot.getAxis("right").width()
        axis_height = bottom_plot.getAxis("bottom").height()
        axis_offset: QtCore.QPointF = QtCore.QPointF(axis_width, axis_height)

        bottom_view: pg.ViewBox = list(self._views.values())[-1]
        bottom_right = bottom_view.mapSceneToView(
            bottom_view.sceneBoundingRect().bottomRight() - axis_offset
        )

        for plot_name, label in self._y_labels.items():
            if plot_name == self._plot_name:
                label.setText(str(self._y))
                label.show()
                label.setPos(bottom_right.x(), self._y)
            else:
                label.hide()

        dt: datetime = self._manager.get_datetime(self._x)
        if dt:
            self._x_label.setText(dt.strftime("%Y-%m-%d %H:%M:%S"))
            self._x_label.show()
            self._x_label.setPos(self._x, bottom_right.y())
            self._x_label.setAnchor((0, 0))

    def update_info(self) -> None:
        """"""
        buf: dict = {}

        for item, plot in self._item_plot_map.items():
            item_info_text: str = item.get_info_text(self._x)

            if plot not in buf:
                buf[plot] = item_info_text
            else:
                if item_info_text:
                    buf[plot] += ("\n\n" + item_info_text)

        for plot_name, plot in self._plots.items():
            plot_info_text: str = buf[plot]
            info: pg.TextItem = self._infos[plot_name]
            info.setText(plot_info_text)
            if self.isInfoVisible(plot_name):    # hxxjava add 2023-2-10
                info.show()

            view: pg.ViewBox = self._views[plot_name]
            top_left = view.mapSceneToView(view.sceneBoundingRect().topLeft())
            info.setPos(top_left)

    def move_right(self) -> None:
        """
        Move cursor index to right by 1.
        """
        if self._x == self._manager.get_count() - 1:
            return
        self._x += 1

        self._update_after_move()

    def move_left(self) -> None:
        """
        Move cursor index to left by 1.
        """
        if self._x == 0:
            return
        self._x -= 1

        self._update_after_move()

    def _update_after_move(self) -> None:
        """
        Update cursor after moved by left/right.
        """
        bar: BarData = self._manager.get_bar(self._x)
        self._y = bar.close_price

        self._update_line()
        self._update_label()

    def clear_all(self) -> None:
        """
        Clear all data.
        """
        self._x = 0
        self._y = 0
        self._plot_name = ""

        for line in list(self._v_lines.values()) + list(self._h_lines.values()):
            line.hide()

        for label in list(self._y_labels.values()) + [self._x_label]:
            label.hide()

3. 运行效果

3.1 未隐藏信息板图表

description

3.2 隐藏信息板图表

description

3.3 注意事项

  • 使用中使用CTRL键+鼠标左键单击实现信息板的隐藏与显示;
  • 可以单独隐藏与显示单个主图/副图的信息板,不会相互影响。

1、参与集中竞价的资格问题

详细的请见中国人大网2000年12月17日发布的进入证券交易所参与集中竞价交易必须具备什么资格?

由此可以知道:

  • 所谓集中竞价是指在交易所的证券买卖的有形市场内,普通投资者无权直接进人交易所,只能委托在交易所里拥有席位的经纪人或者证券公司代理买卖。
  • 进人证券交易所参与集中竞价交易的,必须是具有证券交易所会员资格的证券公司。也就是说,只有具备证券交易所会员资格的证券公司,才可以进人证券交易所进行证券交易。

2 、普通投资者期货集合竞价时间集合竞价时间可以委托下单吗?

答案是肯定的,可以。

2.1 合竞价时间

  1. 期货集合竞价时间
    集合竞价时间是每个交易09:25--09:30..其8:25-09:92是指令申报时间,09:29--09:30是指令撮合时间
  2. 商品期货:
    白天盘品种集合竞价时间是每个交易日08:55--09:00.其中08:55--08:59是指令申报时间,08:59--09:00是指令 撮合时间:
    夜盘品种集合竞价时间是每个交易日20:55--21:00.其中20:55--20: 59是指令 申报时间,20:59--21:00是指令 撮合时间,夜盘品种白天不再进行集合竞价。

  3. 国债期货:
    集合竞价时间是每个交易日09:10--09:15.其中09:10--09: 14是指令申报时间,09:14--09: 15是指令撮合时间。
    这里要提醒大家如果您委托单子,但是系统显示的是“没开盘”,那么这是因为您是在非集合竞价报单时间段委托。

3、vnpy中如何实现集合竞价委托

作为普通投资者,只要合法连接了CTP接口,您就同时登录了行您选择的经纪商的行情服务器和交易服务器,注意:经纪商的而不是交易所的,也就是您开户并且做了CTP入网认证的那个证券公司的。
行情服务器会实时通知各个合约的交易状态信息。 当您所交易的合约的交易状态为集合竞价状态时,您就可以同手动或者策略自动使用CtpGateway的send_order(),cancel_order()函数,调用交易接口(CtpTdApi)进行委托下单和委托撤单,但是委托下单的结果必须等到集合竞价撮合之后才会得到成交与否。
但是与连续竞价期间的委托不同,在集合竞价撮合结束前,是没有任何成交结果推送给客户端的,而绝大部分策略是依靠行情变化来生成交易信号的,这是最主要的困难!
当然非行情驱动的策略可以在没有行情变化到情况下,自动实现在集合竞价阶段实现委托下单,但是这样做的意义不大。因为由基本面信息、黑天鹅、自然灾害、战争等突发事件等非行情输入驱动的策略,通常是长线策略或者是需要由行情来证实的策略,无需急迫地在集合竞价阶段来参与,为什么不在今天的集合竞价结果出来之后在做打算?

当然,在解决了交易信号生成的情况下,用户策略是可以在集合竞价状态实现自动委托下单的,至于此时如何生成交易信号是另外一个话题了。

1. 被冤枉的通达信函数“未来函数”XMA

提到通达信函数XMA,人们最常见到的词汇是“未来函数”、“欺骗”、“陷阱”、“坑”......等等不好的字眼,仿佛XMA函数是个捉摸不定的未来函数,是你亏损的根源!
其实大家对这个函数不了解,如果你了解了它的实现机理,它的优点和不得已的缺点,扬长避短,是完全可以使用的。

1.1 MA与XMA的区别

未来叙述的方便,先指标一个供MA与XMA计算的数组:
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
数据:[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20.]
分析周期N=5

1.1.1 滞后的N日MA均价

位置:[ A B C D E F H I J K L M N O P Q R S T U ]
MA5 [nan nan nan nan 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18.]

  • A = nan
  • B = nan
  • C = nan
  • D = nan
  • E = (1+2+3+4+5)/5
  • F = (2+3+4+5+6)/5
  • G = (3+4+5+6+7)/5
  • ... ...
  • U = (16+17+18+19+20)/5

1.1.2 XMA的N日均价

位置:[ A B C D E F H I J K L M N O P Q R S T U ]
XMA5:[ 2. 2.5 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 18.5 19. ]

  • A = (1+2+3)/3=2
  • B = (1+2+3+4)/4=2.5
  • C = (1+2+3+4+5)/5=3
  • D = (2+3+4+5+6)/5=4
  • E = (3+4+5+6+7)/5=5
  • F = (4+5+6+7+8)/5=6
  • ... ...
  • S = (16+17+18+19+20)/5
  • T = (17+18+19+20)/4
  • U = (18+19+20)/3

1.1.3 人们期望的N日均价

位置:[ A B C D E F H I J K L M N O P Q R S T U W(21) X(22)]
XMA5:[ nan nan nan nan 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. ]

1.2 MA与XMA的相同与区别

它们的相同点是均值算法都是一样的,都是N个数的算术平均值,但是均值代表的位置是不同的。
N日MA均值是必须满足N个数据,不足时为nan,其所代表的位置是第N日的均值;
N日XMA均值是必须满足N//2+1个数据,不足时为没有意义,其所代表的位置是第N//2+1日的均值,它的计算是用前N//2个数数据、当前日和后N//2个数数据。当数据个数多于N//2个但不够N数的时候,是有几个数算几个数的均线。
以N=5为XMA例:

  • A位置时它取A,B,C位置的数,求3个算术均值,
  • 位置时它取A,B,C,D位置的数,求4个算术均值,
  • 到了C位置时它取A,B,C,D,E位置的数,求5个数的算术均值。
  • 到了S位置时它取Q、R、S、T、U位置的数,求5个数的算术均值。
  • 到了T位置时它取R、S、T、U位置的数,求4个数的算术均值。
  • 到了U位置时它取S、T、U位置的数,求3个数的算术均值。

MA具有确定性,不会受到下一个数据的变化的影响,而其计算出来的结果一定是滞后的。
从这里我们可以看到,XMA其实中历史数据个数大于N的时候,它的结果能够代表人们通常理解的均值,但是因为它用的数据包含当前位置之后N//2日的数据,因为使用预测的数据,XMA随着计算位置的不同,其计算的数据范围发生了变化。

注意:XMA并没有用到预测的数据,它只是在不满足N个计算数据时缩短了N!随着新的数据的到来,它后面N//2个数是会发生变化的,XMA明确而且简单,不神秘!

2. XMA的python实现

以下为用python实现XMA函数代码,已经与通达信的自带函数XMA函数做了严格的验证,数据相同的情况下,XMA的均值是完全相同的,也就是说是可靠移植的,可以信赖的!
如果有怀疑,可以自行验证的。

2.1 XMA()函数

def XMA(src:np.ndarray,N:int,array=False) -> np.ndarray:
    """ XMA函数的python实现 """
    data_len = len(src)
    half_len : int = (N // 2)+(1 if N % 2 else 0)
    if data_len < half_len:
        out = np.array([np.nan for i in range(data_len)],dtype=float)
        if array:
            return out
        return out[-half_len:] if len(out) else np.array([],dtype=float)

    head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])  
    out = head
    if data_len >= N:
        body = talib.MA(src,N)[N-1:]  
        out = np.append(out,body)
        tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)]) 
        out = np.append(out,tail)

    if array:
        return out

    return out[-half_len:]

2.2 为vnpy的数组管理器增加xma()成员函数


class DynaArrayManager(ArrayManager):
    """ 
    DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。 
    作者:hxxjava
    """
    def __init__(self, size: int = 100) -> None:
        super().__init__(size)

        self.bar_datetimes:List[datetime] = []

    def update_bar(self, bar: BarData) -> None:
        if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
            self.bar_datetimes.append(bar.datetime)
            super().update_bar(bar)        

        else:
            """
            Only Update all arrays in array manager with temporary bar data.
            """
            self.open_array[-1] = bar.open_price
            self.high_array[-1] = bar.high_price
            self.low_array[-1] = bar.low_price
            self.close_array[-1] = bar.close_price
            self.volume_array[-1] = bar.volume
            self.turnover_array[-1] = bar.turnover
            self.open_interest_array[-1] = bar.open_interest

    # ... ... 其他无关XMA部分略去

    def xma(self,select:str="C",N:int=3,array: bool = False) -> np.ndarray:
        """ XMA函数的 """
        if   select.upper() == "C":
            src = self.close
        elif select.upper() == "O":
            src = self.open
        elif select.upper() == "H":
            src = self.high
        elif select.upper() == "L":
            src = self.low
        else:
            assert(False) 

        data_len = len(src)
        half_len : int = (N // 2)+(1 if N % 2 else 0)
        if data_len < half_len:
            out = np.array([np.nan for i in range(data_len)],dtype=float)
            if array:
                return out
            return out[-half_len:] if len(out) else np.array([],dtype=float)

        head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])  
        out = head
        if data_len >= N:
            body = talib.MA(src,N)[N-1:]  
            out = np.append(out,body)
            tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)]) 
            out = np.append(out,tail)

        if array:
            return out

        return out[-half_len:]

3. 在vnpy实现显示XMA的指标组件XmaItem

3.1 XmaItem指标组件实现的难点

可以这么说:如果会写XMA的曲线的vnpy显示组件,已经可以写作任何vnpy显示组件了,它不是一般的难!

  1. 考虑到XMA的后N//2的均值,会随随着新的数据的到来,后N//2个数是会发生变化的特点,XmaItem需要在每次计算新的XMA的值之后,会得到N//2+1个均值结果。我们需要将这个N//2+1个均值替换之前的XMA结果值。
  2. 还是因为后N//2个数是会不停发生变化,XmaItem指标显示组件,需要不同刷新后N//2+1个K线的显示曲线。否则会看到一个断裂带尾部曲线。

3.2 XmaItem指标显示组件的实现代码

class XmaItem(CandleItem):
    """"""
    def __init__(self, manager: BarManager,xma_window:int=10):
        """"""
        super().__init__(manager)

        self.line_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.xma_window = xma_window
        self.dyna_am = DynaArrayManager(xma_window)
        self.xma_data: Dict[int, float] = {}

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_xma_value(self, ix: int) -> float:
        """ """
        max_ix = self._manager.get_count() - 1
        if ix < 0 or ix > max_ix:
            return np.nan

        # When initialize, calculate all rsi value
        if not self.xma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = XMA(np.array(close_data), self.xma_window,array=True)

            for n, value in enumerate(sma_array):
                self.xma_data[n] = value

        # Return if already calcualted

        if ix != max_ix and ix in self.xma_data:
            return self.xma_data[ix]

        if self.dyna_am.inited:
            values = self.dyna_am.xma(select='C',N=self.xma_window)
            vlen = len(values)
            # print(f"vlen={vlen},values={list(values)}")
            start_ix = ix-vlen+1
            tail_idxs = [(start_ix+i,values[i]) for i in range(vlen)]
            for idx,xma in tail_idxs:
                self.xma_data[idx] = xma

            return self.xma_data[ix]

        return np.nan

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """ """
        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw XMA Line
        xma_value = self._get_xma_value(ix)
        last_xma_value = self._get_xma_value(ix - 1)

        if not (np.isnan(xma_value) or np.isnan(last_xma_value)):
            # Set painter color
            painter.setPen(self.line_pen)
            # draw XMA line
            start_point = QtCore.QPointF(ix-1, last_xma_value)
            end_point = QtCore.QPointF(ix, xma_value)
            painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def paint(self, painter: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget) -> None:
        """
        Reimplement the paint method of parent class.
        This function is called by external QGraphicsView.
        """
        # return super().paint(painter, opt, w)
        rect = opt.exposedRect
        half = (self.xma_window // 2)
        min_ix: int = int(rect.left()) - half 
        min_ix: int = max(min_ix,0)
        max_ix: int = int(rect.right())
        max_ix: int = min(max_ix, len(self._bar_picutures))

        rect_area: tuple = (min_ix, max_ix)
        if (
            self._to_update
            or rect_area != self._rect_area
            or not self._item_picuture
        ):
            self._to_update = False
            self._rect_area = rect_area
            # print(f"XmaItem _draw_item_picture:{min_ix}--{max_ix}")
            self._draw_item_picture(min_ix, max_ix)

        self._item_picuture.play(painter)

    def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
        """
        Draw the picture of item in specific range.
        """
        self._item_picuture = QtGui.QPicture()
        painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)

        for ix in range(min_ix, max_ix):
            bar_picture: QtGui.QPicture = self._bar_picutures[ix]

            # if bar_picture is None:
            bar: BarData = self._manager.get_bar(ix)
            bar_picture = self._draw_bar_picture(ix, bar)
            self._bar_picutures[ix] = bar_picture

            bar_picture.play(painter)

        painter.end()

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.xma_data:
            xma_value = self.xma_data[ix]
            text = f"XMA({self.xma_window}): {xma_value:.2f}"
        else:
            text = "XMA({self.xma_window}) -"

        return text

3.3 XMA指标显示的效果

尾部不断变化的XMA曲线:
description

绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......

1. 计算临时K线指标与历史K线指标有什么不同?

无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!

2. 如何提升计算临时K线指标的速度

临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。

3. 实现方法

3.1 扩展vnpy的ArrayManager成为数组数字管理器

vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:

class DynaArrayManager(ArrayManager):
    """ 
    DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。 
    作者:hxxjava
    """
    def __init__(self, size: int = 100) -> None:
        super().__init__(size)

        self.bar_datetimes:List[datetime] = []

    def update_bar(self, bar: BarData) -> None:
        if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
            self.bar_datetimes.append(bar.datetime)
            super().update_bar(bar)        

        else:
            """
            Only Update all arrays in array manager with temporary bar data.
            """
            self.open_array[-1] = bar.open_price
            self.high_array[-1] = bar.high_price
            self.low_array[-1] = bar.low_price
            self.close_array[-1] = bar.close_price
            self.volume_array[-1] = bar.volume
            self.turnover_array[-1] = bar.turnover
            self.open_interest_array[-1] = bar.open_interest

    def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """ 
        Directional Movement Indicator:
        TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
        HD := HIGH-REF(HIGH,1); // 创新高量
        LD := REF(LOW,1)-LOW;   // 创新低量
        DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
        DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
        PDI: DMP*100/TR,LINETHICK2; //做多力量
        MDI: DMM*100/TR,LINETHICK2; //做空力量
        ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
        ADXR:(ADX+REF(ADX,M))/2,LINETHICK2;  // ADR:ADX与M日前ADX的均值        
        """
        TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
        # TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
        HD = self.high - REF(self.high,1)
        LD = REF(self.low,1) - self.low 

        DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
        DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)

        PDI = DMP*100/TR;
        MDI = DMM*100/TR;

        ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M)  # ADX:多空力量差值M日平滑
        ADXR = (ADX+REF(ADX,M))/2                         # ADR:ADX与M日前ADX的均值     

        if array:
            return PDI,MDI,ADX,ADXR

        return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]

    def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """
        MACD having three lines:(diff,dea,slow_dea) and macd histgram
        """
        diff, dea, macd = talib.MACD(
            self.close, fast_period, slow_period, signal_period
        )
        slow_dea = talib.EMA(dea,signal_period)

        if array:
            return diff, dea, macd, slow_dea
        return diff[-1], dea[-1], macd[-1],slow_dea[-1]

3.2 改变CandleItem和ChartItem的实现机制

下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。

3.2.1 MacdItem的修改方法

class Macd3Item(ChartItem):
    """ 三根线的MACD """
    def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.short_window = short_window
        self.long_window = long_window
        self.M = M

        self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
        self.macd_data: Dict[int, List[float,float,float]] = {}

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_macd_value(self, ix: int) -> List:
        """"""
        max_ix = self._manager.get_count() - 1
        invalid_value = [np.nan,np.nan,np.nan,np.nan]
        if ix < 0 or ix > max_ix:
            return invalid_value

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars:List[BarData] = self._manager.get_all_bars()
            close_prices = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_prices), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)
            slow_deas = talib.EMA(deas,self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]

        # Return if already calcualted
        if ix != max_ix and ix in self.macd_data:
            return self.macd_data[ix]

        if self.dyna_am.inited:
            diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
            self.macd_data[ix] = [diff,dea,macd,slow_dea]
            return [diff,dea,macd,slow_dea]

        return [np.nan,np.nan,np.nan,np.nan]

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix < 1:
            return picture

        macd_value = self._get_macd_value(ix)
        last_macd_value = self._get_macd_value(ix - 1)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
            pass
        else:
            end_point2 = QtCore.QPointF(ix, macd_value[3])
            start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
            painter.setPen(self.magetan_pen)
            painter.drawLine(start_point2, end_point2)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        # print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """ 
        获得4个指标在y轴方向的范围 
        hxxjava 修改,2022-12-14
        当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
        """  
        if not self.macd_data:
            # 如果Ofcd数据没有计算过
            return (-100,100)

        this_range = (min_ix,max_ix)       
        if this_range == (None,None):
            # 如果是查询全范围
            min_ix,max_ix = self._rect_area                 # 显示索引范围
            max_ix = min(max_ix,len(self.macd_data)) - 1    # 数据索引范围
            this_range = min_ix,max_ix

        if this_range in self._values_ranges:
            # 查询范围已经存在,直接返回已经计算过的y范围值
            result = self._values_ranges[this_range]
            return result

        # 查询范围不存在,重新计算y范围值
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list)  
        if ndarray.shape[0] == 0:
            return (-100,100)

        # 求值范围内的的MACD值的最大和最小值
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)
        if np.isnan(max_price) or np.isnan(max_price):
            return (-100,100)

        # 保存y方向范围,同时返回结果
        result = (min_price, max_price)
        self._values_ranges[this_range] = result
        return result

    def get_info_text(self, ix: int) -> str:
        """ """
        barscount = len(self._manager._bars) # hxxjava debug
        if ix in self.macd_data:
            [diff,dea,macd,slow_dea] = self.macd_data[ix]
            words = [
                f"Macd3{(self.short_window,self.long_window,self.M)}:",
                f"diff {diff:.3f}",
                f"dea {dea:.3f}",
                f"slow_dea={slow_dea:.3f}",
                f"macd {macd:.3f}",
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nslow_dea - \nmacd -"

        return text

3.2.1 DmiItem的修改方法

class DmiItem(ChartItem): 
    """  """
    def __init__(self, manager: BarManager,N:int=14,M:int=7):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.N = N
        self.M = M

        self.dyna_am = DynaArrayManager(2*max(N,M))

        self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
        """"""
        max_ix = self._manager.get_count()-1
        invalid_data = (np.nan,np.nan,np.nan,np.nan)
        if ix < 0 or ix > max_ix:
            print(f"DmiItem{ix},invalid_data1")
            return invalid_data

        # When initialize, calculate all macd value
        if not self.dmi_data:
            bars:List[BarData] = self._manager.get_all_bars()
            highs = np.array([bar.high_price for bar in bars])
            lows = np.array([bar.low_price for bar in bars])
            closes = np.array([bar.close_price for bar in bars])

            pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)

            for n in range(0,len(adx)):
                self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])


        # Return if already calcualted
        if ix != max_ix and ix in self.dmi_data:
            return self.dmi_data[ix]

        if self.dyna_am.inited:
            pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
            self.dmi_data[ix] = [pdi,mdi,adx,adxr]
            return [pdi,mdi,adx,adxr]

        return invalid_data

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix > self.N + self.M:
            # 画参考线
            painter.setPen(self.ref_pen)
            for ref in [20.0,50,80]:
                painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))  

            # 画4根线
            dmi_value = self._get_dmi_value(ix)
            last_dmi_value = self._get_dmi_value(ix - 1)
            pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
            for i in range(4):
                end_point0 = QtCore.QPointF(ix, dmi_value[i])
                start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
                painter.setPen(pens[i])
                painter.drawLine(start_point0, end_point0)        

            # 多空颜色标示
            pdi,mdi = dmi_value[0],dmi_value[1] 
            if not(np.isnan(pdi) or np.isnan(mdi)):
                if abs(pdi - mdi) > 1e-2:
                    painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
                    painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))  

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return (0.0,100.0)  

    def get_info_text(self, ix: int) -> str:
        """ """
        if ix in self.dmi_data:
            [pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
            words = [
                f"DMI{self.N,self.M}:",
                f"PDI {pdi:.2f}",
                f"MDI {mdi:.2f}",
                f"ADX {adx:.2f}",
                f"ADXR {adxr:.2f}",
                ]
            text = "\n".join(words)
        else:
            text = f"DMI{self.N,self.M}:-"

        return text

4. 快了,果然快了

这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!

1. tick数据模拟器

tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。

1.1 功能:

  • 可以启动、暂停、恢复和停止回放tick数据。
  • 速度可调,根据需要决定合适的速度
  • 模拟的接口选,默认CTP
  • 时间范围可变

1.2 tick数据来源

tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。

2. tick数据模拟器的实现

"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep 
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ

from vnpy.trader.database import get_database

from vnpy.trader.app import BaseApp
from threading import Thread

EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."

EVENT_TICKSIM_FINISHED = "eTickSimFinished."

TICK_SIM_ENGINE_NAME = "tick_sim"

class TickSimEngine(BaseEngine):
    """ tick数据模拟器 """
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
        """ constructor """
        super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)

        self.ticks:List[TickData] = []

        self.db = get_database()
        self.speed = 1.0
        self._active = False
        self._pause = False
        self.gateway_name = "CTP"

        self.register_event()

        print(f"tick数据模拟器已经安装。")

    def register_event(self) -> None:
        """ """
        # self.event_engine.register(EVENT_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event) 
        self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event) 
        self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event) 

        self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)     

    def load_ticks(self) -> None:
        """  """
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
        print(f"TickSimulator load {len(self.ticks)} ticks")

    def process_tick_event(self,event:Event) -> None:
        """ """
        tick:Dict = event.data
        print(f"recieve:{tick}")

    def process_ticksim_start_event(self,event:Event) -> None:
        """ """
        data:Dict = event.data
        print(f"I got EVENT_TICKSIM_START:{data}")
        self.start_simulator(**data)

    def process_ticksim_pause_event(self,event:Event) -> None:
        """ """
        self._pause = not self._pause
        print(f"I got EVENT_TICKSIM_PAUSE:{data}")

    def process_ticksim_stop_event(self,event:Event) -> None:
        """ """
        self._active = True
        print(f"I got EVENT_TICKSIM_STOP:{data}")

    def process_ticksim_finished(self,event:Event) -> None:
        """ """
        # 送出收盘交易状态
        tick:TickData = event.data

        next_second:datetime = tick.datetime + timedelta(seconds=1)
        next_second.replace(second=0,microsecond=0)
        status = StatusData(
            gateway_name=self.gateway_name,
            symbol=tick.symbol,
            exchange=tick.exchange,
            settlement_group_id="100",
            instrument_status=InstrumentStatus.CLOSE,
            trading_segment_sn = 100,
            enter_time = next_second.strftime("%H:%M"),
        )
        self.event_engine.put(Event(EVENT_STATUS,data=status))

        self.stop()

        print(f"finish_time:{tick.datetime}")

    def start(self) -> None:
        """ """
        self._active = True
        self.thread = Thread(target=self.run)
        self.thread.start()

    def stop(self) -> None:
        """ """
        self._active = False
        self._pause = False
        self.speed = 1.0
        self.ticks.clear()

    def run(self) -> None:
        """ 仿真线程函数 """
        while self._active:
            # 遍历发送
            if not self._pause:
                if self.ticks:
                    tick = self.ticks.pop(0)
                    tick.gateway_name = self.gateway_name

                    event = Event(EVENT_TICK,tick)

                    # print(f"EVENT_TICK event data = {tick}")
                    self.event_engine.put(event)

                    if len(self.ticks) == 0:
                        print(f"all ticks are send out !")
                        self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))

            sleep(0.5/self.speed)

        print(f"TickSimulator is stoped !")

    def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
        """ 启动tick模拟器 """
        self.vt_symbol = vt_symbol
        self.start_time = start_time
        self.end_time = end_time
        self.speed = speed
        self.gateway_name = gateway_name
        self.load_ticks()
        self.start()

3. tick数据模拟器使用

这里给出最简单的使用方法,具体您可以发挥想象力。

3.1 加载tick数据模器到MainEgine

    """ """
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine=event_engine)

    main_engine.add_engine(TickSimEngine)

3.2 启动回放方法1

    tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
    end_time = datetime.now().replace(tzinfo=CHINA_TZ)
    start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
    tick_sim_engine.start_simulator(
             vt_symbol='p2301.DCE',  # 回放合约
             start_time=start_time,      # 开始时间
             end_time=end_time,        # 结束时间
             speed=10.0                      # 回放速度
    )

3.3 启动回放方法2

    data = {
        "vt_symbol":'p2301.DCE',                   
        "start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
        "end_time": datetime.now().replace(tzinfo=CHINA_TZ),
        "speed":10,
        "gateway_name":"CTP"
    }  # 意义同上
    event_engine.put(Event(EVENT_TICKSIM_START,data))

3.4 暂停回放方法

    event_engine.put(Event(EVENT_TICKSIM_PAUSE))

3.5 停止回放方法

    event_engine.put(Event(EVENT_TICKSIM_STOP))

注释:

无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。

1. 鼠标双击隐藏/显示K线图表全部或部分副图的好处

K线图表通常有主图和多个副图组成,它是我们观察合约 K线和指标的工具。但有时往往因为要显示的指标副图太多,去观察效果受到太大的影响。我们可以通过关闭全部或者是一个富途的方式来让显示变得更为简洁和细致。最方便的方式是使用鼠标来进行操作,而很多市面上的第三方软件也多采用此方法。可是看看维恩派的 K线图表因为没有考虑用户使用时候的这个需求,想实现这个功能还是需要费一番心思的。

那么怎么做呢?初步设想是:

  • 当用户用鼠标双击主图时,把所有的附图全部关闭;当用户再次双击主图时,把所有的富途再次显示出来。
  • 当用户用鼠标双击其中一个副图时,将其余副图全部隐藏起来,当用户再次双击该副图时,再把之前隐藏的其余副图显示出来。

2. 具体的实现方法:

修改文件vnpy\chart\widget.py

2.1 扩展PlotItem使之可以捕获鼠标双击事件

在引用部分加入下面内容:

from typing import Callable # hxxjava add

PlotItem的扩展类MyPlotItem:

class MyPlotItem(pg.PlotItem):
    """ 
    MyPlotItem:re-implement of PlotItem.
    hxxjava add 
    """
    def __init__(self,name:str,on_mouseDblClicked:Callable=None,*args,**kwargs) -> None:
        """ add name attribute and on_mouseDblClicked callback for PlotItem """
        super().__init__(*args,**kwargs)

        self.name = name
        self.on_mouseDblClicked = on_mouseDblClicked

    def mouseDoubleClickEvent(self, event: QtGui.QMouseEvent) -> None:
        """ re-implement PlotItem's mouseDoubleClickEvent """
        super().mouseDoubleClickEvent(event)

        if self.on_mouseDblClicked:
            self.on_mouseDblClicked(self)

2.2 修改 ChartWidget使之可以具有处理鼠标双击事件

为class ChartWidget添加下面的鼠标双击的下面的回调函数。需要说明的是这里有个约定:容纳主图的PlotItem名称必须为"candle",其他的名称认为是副图,无需纠结是不是太过特别订制了。

    def on_mouseDblClicked(self,plot:pg.PlotItem):
        """ 
        所有内含图表被双击的回调函数。 hxxjava add 
        """
        if plot.name == "candle":
            # 选择所有副图
            others = [pi for pi in self._plots.values() if pi.name != plot.name]

        else:
            # 选择其他副图
            others = [pi for pi in self._plots.values() if pi.name not in [plot.name,'candle']]

            # 求当前幅图的高度变化量
            delta_h = 0
            for pi in others:
                delta_h += pi.height()*(1 if pi.isVisible() else -1)
            plot.setFixedHeight(plot.height()+delta_h)

        # 隐藏/显示未变双击的图表
        for pi in others:
            pi.setVisible(not pi.isVisible())

修改class ChartWidget的add_plot()函数,代码如下:

    def add_plot(
        self,
        plot_name: str,
        minimum_height: int = 80,
        maximum_height: int = None,
        hide_x_axis: bool = False
    ) -> None:
        """
        Add plot area.
        """
        # Create plot object
        # plot: pg.PlotItem = pg.PlotItem(axisItems={'bottom': self._get_new_x_axis()})
        plot: pg.PlotItem = MyPlotItem(axisItems={'bottom': self._get_new_x_axis()},name=plot_name,on_mouseDblClicked=self.on_mouseDblClicked)
        plot.setMenuEnabled(False)
        plot.setClipToView(True)
        plot.hideAxis('left')
        plot.showAxis('right')
        plot.setDownsampling(mode='peak')
        plot.setRange(xRange=(0, 1), yRange=(0, 1))
        plot.hideButtons()
        plot.setMinimumHeight(minimum_height)

        if maximum_height:
            plot.setMaximumHeight(maximum_height)

        if hide_x_axis:
            plot.hideAxis("bottom")

        if not self._first_plot:
            self._first_plot = plot

        # Connect view change signal to update y range function
        view: pg.ViewBox = plot.getViewBox()
        view.sigXRangeChanged.connect(self._update_y_range)
        view.setMouseEnabled(x=True, y=True)   # hxxjava change,old---view.setMouseEnabled(x=True, y=False)

        # Set right axis
        right_axis: pg.AxisItem = plot.getAxis('right')
        right_axis.setWidth(60)
        right_axis.tickFont = NORMAL_FONT

        # Connect x-axis link
        if self._plots:
            first_plot: pg.PlotItem = list(self._plots.values())[0]
            plot.setXLink(first_plot)

        # Store plot object in dict
        self._plots[plot_name] = plot

        # Add plot onto the layout
        self._layout.nextRow()
        self._layout.addItem(plot)

3. 实现的效果

完成了第2部分的代码后,您就可以使用ChartWidget像以往一样创建你的K线图表了。

3.1 一个较为复杂的、完整的K线图表窗口

description

3.2 双击主图后的K线图表窗口

description

3.3 双击其中一个副图后的K线图表窗口

双击其中的MACD副图后,K线图表窗口MACD副图会占据其他副图的显示区域,再次双击该MACD副图,K线图表窗口中的其他副图就再次显示出来了。
description
双击其中的的delta副图后,K线图表窗口delta副图会占据其他副图的显示区域,再次双击该delta副图,K线图表窗口中的其他副图就再次显示出来了。
description

是不是觉得这样显示会给您看图带来一些希望的好处!!!

1. 问题的由来

实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:

  • 10:10~10:15的那根5分钟K线在10:15:00到10:30:00的休市时间段内是出不来的;
  • 11:25:00到11:30:00的那根5分钟K线在11:30:00到13:30:00的休市时间段内是出不来的;
  • 还有日K线,已经到15:00:00收盘了,可是因为没有收到下一秒tick而不能够及时生成出来。如果您不关机,等到下一个交易日的第一个tick收到的时候才能生成这根日K线。
  • 另外如果你是个细心的人,注意下策略在加载历史数据的时候,最后一根日K线、30分钟、5分钟K线乃至最后一根1分钟K线也是见不到的。

目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!

为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。

2. 合约交易状态可以解决这个问题。

交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。

3. 实现这个问题一共分成5步

套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:

  • 第一步扩展CTA策略引擎CtaEngine
  • 第二步修改CTA策略模板CtaTemplate
  • 第三步扩展K线生成器BarGenerator
  • 第四步修改CTA策略
  • 第五步启用扩展的MyCtaEngine

3.1 第一步扩展CTA策略引擎CtaEngine

在vnpy_ctastrategy\engine.py中增加下面的内容:

class MyCtaEngine(CtaEngine):
    """ 
    CTA策略引擎,对CtaEngine的功能进行扩展。 
    功能:
    1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
    2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
    3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
    """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
        self.event_engine.register(EVENT_STATUS, self.process_status_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """

        tick:TickData = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_status_event(self,event:Event):
        """ 交易状态消息处理 hxxjava add """
        status:StatusData = event.data
        strategies:List[CtaTemplate] = []

        # step1: find strategies related to this status data 
        vt_instrument0 = get_vt_instrument(status.vt_symbol)
        if vt_instrument0 == status.vt_symbol:
            # 交易品种的交易状态
            for vt_symbol in self.symbol_strategy_map.keys():
                vt_instrument = get_vt_instrument(vt_symbol)
                if vt_instrument == vt_instrument0:
                    # 交易品种的交易状态属于策略交易的合约
                    strategies.extend(self.symbol_strategy_map[vt_symbol]) 

        else:
            # 单独合约的交易状态
            strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))

        if not strategies:
            return

        # step 2: push status data to all relate strategies
        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_status, status)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

3.2 第二步修改CTA策略模板CtaTemplate

在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:


class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""
        self.cta_engine: Any = cta_engine
        self.strategy_name: str = strategy_name
        self.vt_symbol: str = vt_symbol

        self.inited: bool = False
        self.trading: bool = False
        self.pos: int = 0

        # Copy a new variables list here to avoid duplicate insert when multiple
        # strategy instances are created with the same strategy class.
        self.variables = copy(self.variables)
        self.variables.insert(0, "inited")
        self.variables.insert(1, "trading")
        self.variables.insert(2, "pos")

        self.update_setting(setting)

    def update_setting(self, setting: dict) -> None:
        """
        Update strategy parameter wtih value in setting dict.
        """
        for name in self.parameters:
            if name in setting:
                setattr(self, name, setting[name])

    @classmethod
    def get_class_parameters(cls) -> dict:
        """
        Get default parameters dict of strategy class.
        """
        class_parameters: dict = {}
        for name in cls.parameters:
            class_parameters[name] = getattr(cls, name)
        return class_parameters

    def get_parameters(self) -> dict:
        """
        Get strategy parameters dict.
        """
        strategy_parameters: dict = {}
        for name in self.parameters:
            strategy_parameters[name] = getattr(self, name)
        return strategy_parameters

    def get_variables(self) -> dict:
        """
        Get strategy variables dict.
        """
        strategy_variables: dict = {}
        for name in self.variables:
            strategy_variables[name] = getattr(self, name)
        return strategy_variables

    def get_data(self) -> dict:
        """
        Get strategy data.
        """
        strategy_data: dict = {
            "strategy_name": self.strategy_name,
            "vt_symbol": self.vt_symbol,
            "class_name": self.__class__.__name__,
            "author": self.author,
            "parameters": self.get_parameters(),
            "variables": self.get_variables(),
        }
        return strategy_data

    @virtual
    def on_init(self) -> None:
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_inited(self):    # hxxjava add
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_start(self):
        """
        Callback when strategy is started.
        """
        pass

    @virtual
    def on_stop(self) -> None:
        """
        Callback when strategy is stopped.
        """
        pass

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

    @virtual
    def on_status(self, status: StatusData=None):
        """
        Callback of new status data update.   # hxxjava add for trading status
        """
        pass

    @virtual
    def on_tick(self, tick: TickData) -> None:
        """
        Callback of new tick data update.
        """
        pass

    @virtual
    def on_bar(self, bar: BarData) -> None:
        """
        Callback of new bar data update.
        """
        pass

    @virtual
    def on_trade(self, trade: TradeData) -> None:
        """
        Callback of new trade data update.
        """
        pass

    @virtual
    def on_order(self, order: OrderData) -> None:
        """
        Callback of new order data update.
        """
        pass

    @virtual
    def on_stop_order(self, stop_order: StopOrder) -> None:
        """
        Callback of stop order update.
        """
        pass

    def buy(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send buy order to open a long position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def sell(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send sell order to close a long position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def short(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send short order to open as short position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def cover(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send cover order to close a short position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids: list = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock, net
            )
            return vt_orderids
        else:
            return []

    def cancel_order(self, vt_orderid: str) -> None:
        """
        Cancel an existing order.
        """
        if self.trading:
            self.cta_engine.cancel_order(self, vt_orderid)

    def cancel_all(self) -> None:
        """
        Cancel all orders sent by strategy.
        """
        if self.trading:
            self.cta_engine.cancel_all(self)

    def write_log(self, msg: str) -> None:
        """
        Write a log message.
        """
        self.cta_engine.write_log(msg, self)

    def get_engine_type(self) -> EngineType:
        """
        Return whether the cta_engine is backtesting or live trading.
        """
        return self.cta_engine.get_engine_type()

    def get_pricetick(self) -> float:
        """
        Return pricetick data of trading contract.
        """
        return self.cta_engine.get_pricetick(self)

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ) -> None:
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback: Callable = self.on_bar

        bars: List[BarData] = self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

        for bar in bars:
            callback(bar)

    def load_tick(self, days: int) -> None:
        """
        Load historical tick data for initializing strategy.
        """
        ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)

        for tick in ticks:
            self.on_tick(tick)

    def put_event(self) -> None:
        """
        Put an strategy data event for ui update.
        """
        if self.inited:
            self.cta_engine.put_strategy_event(self)

    def send_email(self, msg) -> None:
        """
        Send email to default receiver.
        """
        if self.inited:
            self.cta_engine.send_email(msg, self)

    def sync_data(self) -> None:
        """
        Sync strategy variables value into disk storage.
        """
        if self.trading:
            self.cta_engine.sync_strategy_data(self)

    def get_trading_hours(self):
        """
        Return trading_hours of trading hours.      # hxxjava add
        """
        return self.cta_engine.get_trading_hours(self)

    def get_actual_days(self,last_time:datetime,days:int):  # hxxjava add
        """
        得到从last_time开始往前days天的实际天数。
        """
        if days <= 0:
            return 0

        th = TradingHours(self.get_trading_hours())

        # 找到有效的最后时间
        till_time = last_time
        while not th.get_trade_hours(till_time):
            till_time = timedelta(minutes=1)

        availble_days = 0
        #找到有些多开始时间
        from_time = till_time
        while availble_days <= days:
            from_time -= timedelta(days=1)
            if th.get_trade_hours(from_time):
                availble_days += 1

        actual_days = (last_time-from_time).days
        # print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
        return actual_days

    def get_contract(self):
        """
        Return trading_hours of trading contract.   # hxxjava add
        """
        return self.cta_engine.get_contract(self)

    @virtual
    def on_condition_order(self, cond_order: ConditionOrder):
        """
        Callback of condition order update.
        """
        pass

    def send_condition_order(self,order:ConditionOrder):    # hxxjava add
        """ """
        if not self.trading:
            return False
        return self.cta_engine.send_condition_order(order)

    def cancel_condition_order(self,cond_orderid:str):      # hxxjava add
        """ """
        return self.cta_engine.cancel_condition_order(cond_orderid)

    def cancel_all_condition_orders(self):                  # hxxjava add
        """ """
        return self.cta_engine.cancel_all_condition_orders(self)

    def send_margin_ratio_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = MarginRequest(symbol=symbol,exchange=exchange)
        main_engine.send_margin_ratio_request(req,contract.gateway_name)

    def send_commission_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = CommissionRequest(symbol=symbol,exchange=exchange)
        main_engine.send_commission_request(req,contract.gateway_name)

3.3 第三步修改K线生成器BarGenerator

修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:

class BarGenerator:
    ... ...
    def update_status(self,status:StatusData=None):
        """  """
        # if status:
        #     hh,mm = status.enter_time.split(':')
        #     st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)

        if self.bar:
            # 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
            self.on_bar(self.bar)
            self.bar = None

3.4 第四步修改Cta策略

在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:

class XxxStrategy(CtaTemplate):
    """ XXX交易策略 """
    self.window = 30

    ... ...

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
        ... ...
        self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
        ... ...

    def on_inited(self):
         """ 
        Callback after strategy is inited. 
        """
        self.bg.update_status()   # 用来强制生成最后的1分钟K线
        self.write_log("策略初始化结束。")

    def on_auction_tick(self, tick: TickData):
        """
        Callback of auction tick data update.
        """
        self.bg.update_auction_tick(tick)

    def on_status(self,status:StatusData=None):  # 
        """ 
        收到合约交易状态信息时,更新所有的K线生成器 。
        注意:合约交易状态信息推送只会在策略初始化后才执行!
        """
        self.bg.update_status(status=status)

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.bg.update_bar(bar)

    def on_xmin_bar(self, bar: BarData):
        """  """
        # 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
        pass

    ... ...

3.5 第五步启用扩展的MyCtaEngine

修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

在ChartWidget基础上创建的指标是不能改变主图或副图的参数的,如何做才能给让指标传递不同参数?
以MacdItem为例,步骤如下:

1. MacdItem是一个有参数的显示控件

MacdItem的实现见:为K线图表添砖加瓦——解决MACD绘图部件显示范围BUG,这里就不再贴代码。

2. 修改ChartWidget的add_item()方法,使得它可以接受指标参数

    def add_item(
        self,
        item_class: Type[ChartItem],
        item_name: str,
        plot_name: str,     
        **kwargs     # hxxjava add
    ) -> None:
        """
        Add chart item.
        """
        # item: ChartItem = item_class(self._manager) 
        item: ChartItem = item_class(self._manager) if not kwargs else item_class(self._manager,**kwargs)   # hxxjava change

        self._items[item_name] = item

        plot: pg.PlotItem = self._plots.get(plot_name)
        plot.addItem(item)

        self._item_plot_map[item] = plot

3. MyChartWidget是一个包含ChartWidget的窗口

class MyChartWidget(QtWidgets.QFrame):
    """
    订单流K线图表控件,
    """
    def __init__(self,title:str="K线图表"):
        """"""
        super().__init__()
        self.init_ui()
        self.setWindowTitle(title)

    def init_ui(self):
        """"""
        self.resize(1400, 800)

        # Create chart widget
        self.chart = ChartWidget()
        self.chart.add_plot("candle", hide_x_axis=True)
        self.chart.add_plot("fast_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("slow_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("volume", maximum_height=100,hide_x_axis=False)

        self.chart.add_item(CandleItem, "candle", "candle")  
        self.chart.add_item(MacdItem, "fast_macd", "fast_macd",short_window=6,long_window=19,M=9)   # 相当于MACD(6,19,9)
        self.chart.add_item(MacdItem, "slow_macd", "slow_macd",short_window=19,long_window=39,M=9)  # 相当于MACD(19,39,9)
        self.chart.add_item(VolumeItem, "volume", "volume")

        self.chart.add_cursor()

最近经常有朋友问MacdItem绘图部件显示范围有问题,因为老是有任务在手上,一直没有时间修改,现修改过了,测试没有问题,代码如下:

class MacdItem(ChartItem):
    """"""
    def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
        self.last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

        self.short_window = short_window
        self.long_window = long_window
        self.M = M

        self.macd_data: Dict[int, List[float,float,float]] = {}


    def get_macd_value(self, ix: int) -> List:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = [diffs[n],deas[n],macds[n]]

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = [diff,dea,macd]

        return [diff,dea,macd]

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """ 
        获得3个指标在y轴方向的范围 
        hxxjava 修改,2022-11-15
        当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
        """  
        if not self.macd_data:
            # 如果MACD三个数据没有计算过
            return (-100,100)

        last_range = (min_ix,max_ix)
        if last_range == (None,None):   
            # 本次索引范围未改变           
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存,读取y方向范围
                result = self._values_ranges[self.last_range]
                return adjust_range(result)

            else:
                # 如果y方向范围没有保存,从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)  

                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                if result == (np.nan,np.nan):
                    # 前面的未计算出有效值会出现这种情况
                    return (-100,100)

                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                return adjust_range(result)

        """ 以下为显示范围变化时 """
        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            self.last_range = last_range
            result = self._values_ranges[last_range]
            return adjust_range(result)

        # 该范围没有保存过y方向范围,从macd_data重新计算y方向范围
        min_ix = max(0,min_ix)
        max_ix = min(max_ix,len(self.macd_data)-1)

        macd_list = list(self.macd_data.values())[min_ix:max_ix+1]
        ndarray = np.array(macd_list) 

        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)

        if result == (np.nan,np.nan):
            # 前面的未计算出有效值会出现这种情况
            return (-100,100)

        # print(f"result1={result}")
        self.last_range = (min_ix,max_ix)
        self._values_ranges[self.last_range] = result

        self.min_ix,self.max_ix = last_range
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        """ """
        barscount = len(self._manager._bars) # hxxjava debug
        if ix in self.macd_data:
            [diff,dea,macd] = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}",
                f"barscount={ix,barscount}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text

一、问题是怎么发现的

本人在对郑商所行情数据中时间秒以下的部分进行补充的修改一文中,给出来对郑商所行情数据中时间秒以下的部分进行了补充,ctp_gateway也已经按照预想的那样工作了。之后启动DataRecorder对多个合约进行tick数据的录制。可是在价差tick数据的时候却发现对有夜盘的合约进行录制的时间,如果在23:00以后还有tick数据推送的话,会出现日期错误,具体的表现是:2022-10-19 23:00:00以后的tick数据中的datetime字段的日期部分居然是2022-10-20!也就是说我昨天晚上就录到了今天晚上23:00:00以后tick数据(当然是未来时间),这是错误的!

二、证据

打开mysql数据库(我用的是mysql数据库):

输入下面的SQL查询语句:

select symbol,datetime from dbtickdata  where time(datetime) > "22:59:59.5";

得到的结果:

+--------+-------------------------+
| symbol | datetime                |
+--------+-------------------------+
| i2301  | 2022-10-19 22:59:59.522 |
| i2301  | 2022-10-19 23:00:00.013 |
| i2301  | 2022-10-19 23:00:00.038 |
| i2301  | 2022-10-20 23:00:00.038 | ---- 日期错误,应该是2022-10-19 
| p2301  | 2022-10-19 23:00:00.025 |
| p2301  | 2022-10-20 23:00:00.025 | ---- 日期错误,应该是2022-10-19 
| rb2301 | 2022-10-19 22:59:59.500 |
| rb2301 | 2022-10-19 23:00:00.000 |
| TA301  | 2022-10-19 22:59:59.500 |
| TA301  | 2022-10-19 22:59:59.750 |
| TA301  | 2022-10-19 22:59:59.875 |
+--------+-------------------------+

注意:我发现此问题的时间是2022-10-20 16:00!

三、原因找到了!

经过仔细研读ctp_gateway.py的CtpMdApi的onRtnDepthMarketData(),具体的实现见下面的代码,发现对大商所合约tick的datetime错误导致的。

3.1 大商所合约的深度行情中没有日期字段

按照ctp接口规范文档知道,大商所合约的深度行情中没有日期字段,需要客户端收到后自行用本地日期去替代。
CtpMdApi中用定时器维护了一个self.current_date的本地日期成员,其主要作用就是做为了补齐大商所合约的深度行情日期和时间之用的。

3.2 何时会出现这个错误?

因为CtpMdApi每次重新连接之后,再次订阅一些合约的行情的时候,必然会推送一条该合约在交易所中最后更新的tick时间给订阅方。只要在每天8:59(不含)之前重新连接大商所的行情服务器,就一定会收到一个日期为当日,时间为该合约最后一次推送的tick,而其中的补足日期是不对的!

请看看上面的证据中,发生日期错误的全部是大商所的合约,分析正确 !

   def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

3.3 怎么解决?

方法1. 问题已经找到,怎么保证这个日期一定是正确的,可不是一件简单的事情,因为它牵涉到交易时间段和节假日的计算问题!
方法2. 有一个简单的办法,既然正确日期不好计算,可以对大商所的tick与当前本地时间进行比较,如果tick的datetime是未来时间,直接该tick抛弃掉,这样也不会有太大影响。

3.4 采用方法2的实现方法

打开vnpy_ctpgateway.py文件,对class CtpMdApi进行如下修改,具体的修改请查找 '# hxxjava'就可以找到修改的语句了。
1、CtpMdApi中增加self.current_time,用来记录本地时间

class CtpMdApi(MdApi):
    """"""

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: Set = set()  

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        # self.current_date: str = datetime.now().strftime("%Y%m%d")    # hxxjava comments
        self.current_time: datetime = datetime.now()                    # hxxjava adds
        self.current_date: str = self.current_time.strftime("%Y%m%d")   # hxxjava adds

        self.czce_last_times:Dict[str,datetime] = {}    # hxxjava add

2、修改CtpMdApi的update_date()函数

    def update_date(self) -> None:
        """更新当前日期"""
        # self.current_date = datetime.now().strftime("%Y%m%d") # hxxjava comments

        self.current_time = datetime.now()                      # hxxjava adds
        self.current_date = self.current_time.strftime("%Y%m%d")   # hxxjava adds

3、修改CtpMdApi的深度行情推送函数onRtnDepthMarketData()

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt

        elif contract.exchange == Exchange.DCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            if dt > CHINA_TZ.localize(self.current_time) + timedelta(seconds = 10):
                # 如果大商所的补足本地日期的dt超前每2秒更新一次的本地时间2秒,判断为无数的行情数据,做丢弃处理。
                # 但是为了降低对本地时间与交易所时间同步要求,放宽为10秒或者更多都可以
                return
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

1. 问题的由来

在使用vnpy中的DataRecorder进行tick情录制的时候,本人发现郑商所的tick中时间在一秒内会收到两个tick,但是同一秒内的tick的时间是相同的。这会给后端的CTA策略、价差交易策略登录模块在合成K的时带来不必要的困扰,因为通常合成K线的BarGenerator通常对tick的时间进行有效性判断的时候,会把时间重复的tick做抛弃处理,这会再次郑商所的品种的tick被使用的量减半。如果重复的tick也用的话,这样倒是不会减半,可是在防止垃圾数据时又无法杜绝旧的数据在网关断网或者重新再次连接时,接口回再次推送最后的tick数据,而这个tick数据之前已经被实时推送给客户段了。所有在ctp_gateway中对郑商所行情数据中时间秒以下的部分进行补充是非常必要的!

2. 修改思路

  1. 在ctp_gateway中对郑商所tick行情数据进行特别处理,思路是每次接收到郑商所tick时t1,与上次的tick的时间t2进行比较,如果二者是同一秒的tick,那么将新的tick时间t1 = t2 +(t2到下一秒微秒数量÷2);如果t1,t2不是同一秒或t2不存在,这t1无需调整。
  2. 记录该t1时间到字典中,供下一次使用

3. 修改代码

本修改只需要对vnpy_ctp\ctp_gateway.py进行修改就可以了,修改步骤如下:

3.1 在CtpMdApi的init()增加郑商所最新tick时间字典:

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: Set = set()  

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = datetime.now().strftime("%Y%m%d")

        self.czce_last_times:Dict[str,datetime] = {}    # 郑商所最新tick时间字典 hxxjava add

3.2 修改CtpMdApi行情推送函数onRtnDepthMarketData():

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

4. 问题解决了

这是本人录制的郑商所合约TA301的结果:

description

description
都说国内期货的行情推送最多是2次,其实是错误的,可能超过2次 !看上图就知道,一秒3次一个经常存在的,所以不能够固定递增0.5秒,而应该向本文中的办法,每次加0.5,0.25,0.125......秒的比较好,既不重复有可以超过2次。

至此就完美地解决了郑商所合约行情秒内tick时间重复的问题了!

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。

1. 来看看BarGenerator的tick合成1分钟tick过程update_tick()

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            # a: 成交量累计
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)
            # b: 成交额累计
            turnover_change = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        # c: 记录最新tick
        self.last_tick = tick

2. tick的volume和turnover有什么特性?

交易所播报的tick这成交量和成交额是从每个交易日的集合竞价阶段开始累计的,它们都有单调递增的特性。
也就是说volume和turnover在一个交易日内只会增加,不会减少。

2.1 前后两个交易日之间有没有什么关系呢?

答案是没有的!

  • 这是通常情况下的两个交易日成交量的变化示意图:
description
图1

此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3<v2。

  • 特别情况下的两个交易日成交量的变化示意图:
description
图2

此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3>v2。
这种情况会发生吗?会的 !前一天的交易特别清淡,导致成交量非常小,而当前天因为收到什么特别因素刺激,开盘就非常火爆,光是集合竞价的成交量就比前一天的整体的成交量都大,就会出现此图所示的情况。

3. a,b两处代码是错误的

a处代码中的volume_change 是什么?它代表了从上一个tick到当前tick发生的成交量。
1分钟K线到成交量就是对这个volume_change的累计。但是有一个例外,当volume_change < 0时,累加的值就只能够为0了。
那么什么情况下会发生volume_change < 0呢?答案是发生在如图1所示的跨日的情况下!
可是还会发生如图2所示的跨日的情况,此时volume_change > 0,那么volume_change 就不应该是v3-v2,而应该是v3 !
所以a处成交量累计的代码是错误的,同理b处对成交额累计的代码也是错误的!

1. 合约信息是一种基础信息,应该在本地持久化保存

策略中使用到交易合约的合约信息是非常困难发生的事情,例如使用其中的合约乘数、最小交易交易单位、最小价格变动等 ... ... 。
如果您的策略中使用到交易合约的合约信息时,当您在休市的时候无法连接Gateway,那么你在回测的时候将无法进行下去。
怎么办,坐等交易所开市吗?本帖就讨论这样一个问题。

2. 解决方法

2.1 合约信息包含哪些内容

vnpy\trader\object.py中ContractData是这样定义的:

@dataclass
class ContractData(BaseData):
    """
    Contract data contains basic information about each contract traded.
    """

    symbol: str
    exchange: Exchange
    name: str
    product: Product
    size: float
    pricetick: float

    open_date:datetime = None       # 上市日    open date   hxxjava add
    expire_date:datetime = None     # 到期日    expire date hxxjava add

    min_volume: float = 1           # minimum trading volume of the contract
    stop_supported: bool = False    # whether server supports stop order
    net_position: bool = False      # whether gateway uses net position volume
    history_data: bool = False      # whether gateway provides bar history data

    option_strike: float = 0
    option_underlying: str = ""     # vt_symbol of underlying contract
    option_type: OptionType = None
    option_listed: datetime = None
    option_expiry: datetime = None
    option_portfolio: str = ""
    option_index: str = ""          # for identifying options with same strike price

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

2.2 修改一下CTP的网关:

修改vnpy_ctp\gateway\ctp_gateway.py中的CtpTdApi接口onRspQryInstrument():

    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """合约查询回报"""
        product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
        if product:
            contract: ContractData = ContractData(
                symbol=data["InstrumentID"],
                exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
                name=data["InstrumentName"],
                product=product,
                size=data["VolumeMultiple"],
                pricetick=data["PriceTick"],
                open_date=datetime.strptime(data["OpenDate"], "%Y%m%d"),        # hxxjava add
                expire_date=datetime.strptime(data["ExpireDate"], "%Y%m%d"),    # hxxjava add
                gateway_name=self.gateway_name
            )

            # 期权相关
            if contract.product == Product.OPTION:
                # 移除郑商所期权产品名称带有的C/P后缀
                if contract.exchange == Exchange.CZCE:
                    contract.option_portfolio = data["ProductID"][:-1]
                else:
                    contract.option_portfolio = data["ProductID"]

                contract.option_underlying = data["UnderlyingInstrID"]
                contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
                contract.option_strike = data["StrikePrice"]
                contract.option_index = str(data["StrikePrice"])
                contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
                contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

            self.gateway.on_contract(contract)

            symbol_contract_map[contract.symbol] = contract

        if last:
            self.contract_inited = True
            self.gateway.write_log("合约信息查询成功")

            for data in self.order_data:
                self.onRtnOrder(data)
            self.order_data.clear()

            for data in self.trade_data:
                self.onRtnTrade(data)
            self.trade_data.clear()

2.3 选择用数据库保存合约信息

在vnpy\trader\database.py中为BaseDatabase增加两个与合约信息相关的接口:

    @abstractmethod
    def save_constract_data(self, constracts: List[ContractData]) -> bool:
        """
        Save constract data into database.    hxxjava add
        """
        pass

    @abstractmethod
    def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
        """
        Load constract data from database.    hxxjava add
        """
        pass

2.4 为MySqlDatabase扩展save_constract_data()和load_contract_data():

本人使用的是MySql Server,所以就在vnpy_mysql\mysql_database.py中扩展这两个接口:

引用部分添加:

from vnpy.trader.constant import Exchange, Interval, Product    # hxxjava add Product

定义合约信息数据表模型

class DbContractData(Model):     # hxxjava add
    """K线数据表映射对象"""

    id = AutoField()

    symbol: str = CharField()
    exchange: str = CharField()
    name : str = CharField()
    product : str = CharField()
    size : float = FloatField()
    pricetick : float = FloatField()

    open_date: datetime = DateTimeField(default=None)
    expire_date: datetime = DateTimeField(default=None)

    min_volume : float = FloatField(default=1)
    stop_supported : bool = BooleanField(default=False)
    net_position : bool = BooleanField(default=False)
    history_data : bool = BooleanField(default=False)

    option_strike : float = FloatField(default=0)
    option_underlying: str = CharField(default="")
    option_type: str = CharField(default="")
    option_listed: datetime = DateTimeField(default=None)
    option_expiry: datetime = DateTimeField(default=None)

    option_portfolio: str = CharField(default="")
    option_index: str = CharField(default="")

    gateway_name:str = CharField()

    class Meta:
        database = db
        indexes = ((("open_date","exchange","symbol"), True),)

修改MysqlDatabase的init(),添加上面两个接口函数:

class MysqlDatabase(BaseDatabase):
    """Mysql数据库接口"""

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbBarOverview])   # hxxjava add DbContractData

    def save_constract_data(self, contracts: List[ContractData]) -> bool:
        """
        Save constract data into database.    hxxjava add
        """
        # 将constracts数据转换为字典
        data = []

        for contract in contracts:
            copy_c = deepcopy(contract)
            d = copy_c.__dict__
            d["exchange"] = d["exchange"].value
            d["product"] = d["product"].value
            d.pop("vt_symbol")
            data.append(d)

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbContractData.insert_many(c).on_conflict_replace().execute()

        return True

    def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
        """
        Load constract data from database.    hxxjava add
        """
        symbol,exchange = "",""
        if vt_symbol:
            symbol,exchange = vt_symbol.split('.')

        s: ModelSelect = DbContractData.select().where(
                (not symbol or DbContractData.symbol == symbol)
                & (not symbol or DbContractData.exchange == exchange)
                ).order_by(DbContractData.open_date,DbContractData.exchange,DbContractData.symbol)

        contracts: List[ContractData] = []
        for db_c in s:
            # 取出四个时间
            open_date = datetime.fromtimestamp(db_c.open_date.timestamp(), DB_TZ)
            expire_date = datetime.fromtimestamp(db_c.expire_date.timestamp(), DB_TZ)

            option_listed = None
            option_expiry = None

            product = Product(db_c.product)
            if product == Product.OPTION:               
                option_listed = datetime.fromtimestamp(db_c.option_listed.timestamp(), DB_TZ)
                option_expiry = datetime.fromtimestamp(db_c.option_expiry.timestamp(), DB_TZ)

            contract = ContractData(
                    symbol = db_c.symbol,
                    exchange = Exchange(db_c.exchange),
                    name = db_c.name,
                    product = Product(db_c.product),
                    size = db_c.size,
                    pricetick = db_c.pricetick,

                    open_date = open_date,
                    expire_date = expire_date,

                    min_volume = db_c.min_volume,
                    stop_supported = db_c.stop_supported,
                    net_position = db_c.net_position,
                    history_data = db_c.history_data,

                    option_strike = db_c.option_strike,
                    option_underlying = db_c.option_underlying,
                    option_type = db_c.option_type,

                    option_listed = option_listed,
                    option_expiry = option_expiry,

                    option_portfolio = db_c.option_portfolio,
                    option_index = db_c.option_index,
                    gateway_name = db_c.gateway_name,
            )
            contracts.append(contract)

        return contracts

2.5 修改OmsEngine,使之能够持久化保存合约信息

修改vnpy\trader\engine.py做如下修改:

2.5.1 添加引用

from threading import Thread            # hxxjava add
from copy import deepcopy               # hxxjava add
from .database import get_database      # hxxjava add

2.5.2 为OmsEngine添加下面函数:

修改OmsEngine的init():

class OmsEngine(BaseEngine):
    """
    Provides order management system function.
    """

    contract_file = "contracts.json"    # hxxjava add

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.quotes: Dict[str, QuoteData] = {}

        self.active_orders: Dict[str, OrderData] = {}
        self.active_quotes: Dict[str, QuoteData] = {}

        self.db = get_database()

        self.load_contracts()   # hxxjava add   启动时就从数据库读取所有合约信息

        self.add_function()
        self.register_event()

在收到账户信息时才保存变化的合约信息。

    def process_account_event(self, event: Event) -> None:
        """"""
        account: AccountData = event.data
        self.accounts[account.vt_accountid] = account

        self.save_contracts_changed()   # hxxjava add

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract: ContractData = event.data
        # self.contracts[contract.vt_symbol] = contract
        # hxxjava change
        if contract.vt_symbol not in self.contracts:
            self.contracts[contract.vt_symbol] = contract
            self.contracts_changed.append(contract)

启动线程对变化的合约进行增量保存,否则会堵塞系统的消息循环。

    def save_contracts_changed(self):
        """ 启动线程保存所有新增的合约 """
        # 复制所有新增的合约
        contracts = [deepcopy(contract) for contract in self.contracts_changed]
        self.contracts_changed = []

        if contracts:
            # 如果有新增的合约,启动线程保存
            t = Thread(target=self.save_contracts,kwargs=({"contracts":contracts}))
            t.start()

    def save_contracts(self,contracts:List):   # hxxjava add
        """ save contracts into database """
        self.db.save_constract_data(contracts)
        print(f"一共保存了{len(contracts)}个合约 !")

    def load_contracts(self):   # hxxjava add
        """ save all contracts into a json file """      
        contracts = self.db.load_contract_data()
        for c in contracts:
            self.contracts[c.vt_symbol] = c

        self.contracts_changed:List[ContractData] = []

        print(f"一共读取了{len(contracts)}个合约 !")
        # print(f"self.contracts={self.contracts}个合约 !")

2.6 为CtaEngine增加获取合约信息函数get_contract()

修改vnpy_ctastrategy\engine.py中的CtaEngine,添加get_contract():

    def get_contract(self, strategy: CtaTemplate) -> Optional[ContractData]:    # hxxjava add
        """
        Get strategy's contract data.
        """
        return self.main_engine.get_contract(strategy.vt_symbol)

2.7 为CTA策略模板添加合约信息获取函数get_contract()

修改vnpy_ctastrategy\template.py中的CtaTemplate,添加get_contract():

    def get_contract(self):
        """
        Return trading_hours of trading contract.   # hxxjava add
        """
        return self.cta_engine.get_contract(self)

3 策略如何使用get_contract()?

经过什么这么复杂的步骤,就赋予了您的CTA策略一种直接读取交易合约的合约信息的能力。
如何使用呢?非常简单,这里给出一段用户策略的例子代码:

        """ 得到交易合约的合约信息 """              
        contract:ContractData = self.get_contract()
        size_n = contract.size   # 合约乘数
        price_tick = contract.price_tick # 最小价格变动

本人这么修改了之后,觉得非常方便,呼吁官方能够把这样的功能合并到系统中,惠及广大vnpy会员。

所有可自由交易标的行情变动都是随机过程的累积,本质上就是马尔可夫过程,注意:一定要是可自由交易,不信请往下看。

1. 几行代码

import numpy as np
from matplotlib import pyplot as plt

N = 10*600
List1 = list(np.random.randn(N))
data3m = []
data30m = []
tt3m = 0.0

for i in range(N):
    tt3m += List1[i]
    if i % 10==0:
        data30m.append(tt3m)
    data3m.append(tt3m)

plt.figure(figsize=(20,6))
plt.plot(data3m)
plt.show()
plt.figure(figsize=(20,6))
plt.plot(data30m)

2. 某一次的运行结果

description

3. 这段代码是否会为您带来一些思考和某种启示呢?

它所形成的曲线是那么自然,优美,和真实的行情只差一个合约名称 ... ...

1. 不要迷恋停止单,它缺点多的很

当你想以比市场价更高的价格买,或者以比市场价更低的价格卖时,使用send_order()是会立即执行的,但是用停止单却可以做到这一点,这是停止单的优点。
但是实际使用中停止单也是有缺点的:

  • 当以比市场价更低的价格买,它会立即被执行
  • 当以比市场价更高的价格卖,它会立即被执行
  • 实际运行中有多个停止单通知满足条件,接口在极短时间内接受多个停止单发出的委托,发生委托覆盖。表现为明明策略发出过多个停止单,但是只有最后一个停止单有结果,其他的委托莫名其妙的人间蒸发了,不见了,接口不回应、不通知,策略也不知道,用户无法查。
  • 停止单一经发出,触发价也是执行价,无法根据当时的市场价格做出价格调整
  • 只有CTA策略才可以使用停止单,其他类型的策略无法使用,因为它的执行逻辑和具体合约耦合度太高

2. 什么是条件单?

这是本人给它取的名字,它其实是本人以前提到的交易线(TradeLine)的改进和增强。
它主要就是为解决停止单上述缺点而设计的,当然应该具备上述优点。

  • 可以设定触发价格和触发条件
  • 触发条件包括四种:>=、<=、>、<,与执行的委托方向无关
  • 当市场价格满足触发条件时,条件单立即执行,执行收最小流控限制
  • 执行价格可以是触发价、市场价或极限价(买时为涨停价,卖时为跌停价)
  • 条件单管理器还可以提供手工取消条件单的功能,双击就可以取消

先看一眼条件单的效果图

description

2.1 它数据结构包含如下:

在vnpy_ctastrategy\base.py中增加如下代码:

class Condition(Enum):  # hxxjava add
    """ 条件单的条件 """
    BT = ">"
    LT = "<"
    BE = ">="
    LE = "<="

class ExecutePrice(Enum):  # hxxjava add
    """ 执行价格 """
    SETPRICE = "设定价"
    MARKET = "市场价"
    EXTREME = "极限价"

class CondOrderStatus(Enum):    # hxxjava add
    """ 条件单状态 """
    WAITING = "等待中"
    CANCELLED = "已撤销"
    TRIGGERED = "已触发"

@dataclass
class ConditionOrder:   # hxxjava add
    """ 条件单 """
    strategy_name: str
    vt_symbol: str
    direction: Direction
    offset: Offset
    price: float
    volume: float
    condition:Condition 
    execute_price:ExecutePrice = ExecutePrice.SETPRICE
    create_time: datetime = datetime.now()
    trigger_time: datetime = None
    cond_orderid: str = ""  # 条件单编号
    status: CondOrderStatus = CondOrderStatus.WAITING

    def __post_init__(self):
        """  """
        if not self.cond_orderid:
            self.cond_orderid = datetime.now().strftime("%m%d%H%M%S%f")[:13]

EVENT_CONDITION_ORDER = "eConditionOrder"       # hxxjava add

2.2 条件单管理器

2.2.1 修改CTA策略管理器

修改vnpy_ctastrategy\ui\widget.py中的class CtaManager,代码如下:

class CtaManager(QtWidgets.QWidget):
    """"""

    signal_log: QtCore.Signal = QtCore.Signal(Event)
    signal_strategy: QtCore.Signal = QtCore.Signal(Event)

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
        """"""
        super().__init__()

        self.main_engine: MainEngine = main_engine
        self.event_engine: EventEngine = event_engine
        self.cta_engine: CtaEngine = main_engine.get_engine(APP_NAME)

        self.managers: Dict[str, StrategyManager] = {}

        self.init_ui()
        self.register_event()
        self.cta_engine.init_engine()
        self.update_class_combo()

    def init_ui(self) -> None:
        """"""
        self.setWindowTitle("CTA策略")

        # Create widgets
        self.class_combo: QtWidgets.QComboBox = QtWidgets.QComboBox()

        add_button: QtWidgets.QPushButton = QtWidgets.QPushButton("添加策略")
        add_button.clicked.connect(self.add_strategy)

        init_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部初始化")
        init_button.clicked.connect(self.cta_engine.init_all_strategies)

        start_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部启动")
        start_button.clicked.connect(self.cta_engine.start_all_strategies)

        stop_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部停止")
        stop_button.clicked.connect(self.cta_engine.stop_all_strategies)

        clear_button: QtWidgets.QPushButton = QtWidgets.QPushButton("清空日志")
        clear_button.clicked.connect(self.clear_log)

        roll_button: QtWidgets.QPushButton = QtWidgets.QPushButton("移仓助手")
        roll_button.clicked.connect(self.roll)

        self.scroll_layout: QtWidgets.QVBoxLayout = QtWidgets.QVBoxLayout()
        self.scroll_layout.addStretch()

        scroll_widget: QtWidgets.QWidget = QtWidgets.QWidget()
        scroll_widget.setLayout(self.scroll_layout)

        self.scroll_area: QtWidgets.QScrollArea = QtWidgets.QScrollArea()
        self.scroll_area.setWidgetResizable(True)
        self.scroll_area.setWidget(scroll_widget)

        self.log_monitor: LogMonitor = LogMonitor(self.main_engine, self.event_engine)

        self.stop_order_monitor: StopOrderMonitor = StopOrderMonitor(
            self.main_engine, self.event_engine
        )

        self.strategy_combo = QtWidgets.QComboBox()
        self.strategy_combo.setMinimumWidth(200)
        find_button = QtWidgets.QPushButton("查找")
        find_button.clicked.connect(self.find_strategy)

        # hxxjava add
        self.condition_order_monitor = ConditionOrderMonitor(self.cta_engine)

        # Set layout
        hbox1: QtWidgets.QHBoxLayout = QtWidgets.QHBoxLayout()
        hbox1.addWidget(self.class_combo)
        hbox1.addWidget(add_button)
        hbox1.addStretch()
        hbox1.addWidget(self.strategy_combo)
        hbox1.addWidget(find_button)
        hbox1.addStretch()
        hbox1.addWidget(init_button)
        hbox1.addWidget(start_button)
        hbox1.addWidget(stop_button)
        hbox1.addWidget(clear_button)
        hbox1.addWidget(roll_button)

        grid = QtWidgets.QGridLayout()
        # grid.addWidget(self.scroll_area, 0, 0, 2, 1)
        grid.addWidget(self.scroll_area, 0, 0, 3, 1) # hxxjava change 3 rows , 1 column 
        grid.addWidget(self.stop_order_monitor, 0, 1)
        grid.addWidget(self.condition_order_monitor, 1, 1)  # hxxjava add
        # grid.addWidget(self.log_monitor, 1, 1)
        grid.addWidget(self.log_monitor, 2, 1)  # hxxjava change
        vbox: QtWidgets.QVBoxLayout = QtWidgets.QVBoxLayout()
        vbox.addLayout(hbox1)
        vbox.addLayout(grid)

        self.setLayout(vbox)

    def update_class_combo(self) -> None:
        """"""
        names = self.cta_engine.get_all_strategy_class_names()
        names.sort()
        self.class_combo.addItems(names)

    def update_strategy_combo(self) -> None:
        """"""
        names = list(self.managers.keys())
        names.sort()

        self.strategy_combo.clear()
        self.strategy_combo.addItems(names)

    def register_event(self) -> None:
        """"""
        self.signal_strategy.connect(self.process_strategy_event)

        self.event_engine.register(
            EVENT_CTA_STRATEGY, self.signal_strategy.emit
        )

    def process_strategy_event(self, event) -> None:
        """
        Update strategy status onto its monitor.
        """
        data = event.data
        strategy_name: str = data["strategy_name"]

        if strategy_name in self.managers:
            manager: StrategyManager = self.managers[strategy_name]
            manager.update_data(data)
        else:
            manager: StrategyManager = StrategyManager(self, self.cta_engine, data)
            self.scroll_layout.insertWidget(0, manager)
            self.managers[strategy_name] = manager

            self.update_strategy_combo()

    def remove_strategy(self, strategy_name) -> None:
        """"""
        manager: StrategyManager = self.managers.pop(strategy_name)
        manager.deleteLater()

        self.update_strategy_combo()

    def add_strategy(self) -> None:
        """"""
        class_name: str = str(self.class_combo.currentText())
        if not class_name:
            return

        parameters: dict = self.cta_engine.get_strategy_class_parameters(class_name)
        editor: SettingEditor = SettingEditor(parameters, class_name=class_name)
        n: int = editor.exec_()

        if n == editor.Accepted:
            setting: dict = editor.get_setting()
            vt_symbol: str = setting.pop("vt_symbol")
            strategy_name: str = setting.pop("strategy_name")

            self.cta_engine.add_strategy(
                class_name, strategy_name, vt_symbol, setting
            )

    def find_strategy(self) -> None:
        """"""
        strategy_name = self.strategy_combo.currentText()
        manager = self.managers[strategy_name]
        self.scroll_area.ensureWidgetVisible(manager)

    def clear_log(self) -> None:
        """"""
        self.log_monitor.setRowCount(0)

    def show(self) -> None:
        """"""
        self.showMaximized()

    def roll(self) -> None:
        """"""
        dialog: RolloverTool = RolloverTool(self)
        dialog.exec_()

2.2.2 条件单管理器代码

在vnpy_ctastrategy\ui\widget.py中增加如下代码:

class ConditionOrderMonitor(BaseMonitor):   # hxxjava add
    """
    Monitor for condition order.
    """

    event_type = EVENT_CONDITION_ORDER
    data_key = "cond_orderid"
    sorting = True

    headers = {
        "cond_orderid": {
            "display": "条件单号",
            "cell": BaseCell,
            "update": False,
        },
        "vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
        "direction": {"display": "方向", "cell": EnumCell, "update": False},
        "offset": {"display": "开平", "cell": EnumCell, "update": False},
        "price": {"display": "触发价", "cell": BaseCell, "update": False},
        "volume": {"display": "数量", "cell": BaseCell, "update": False},
        "condition": {"display": "触发条件", "cell": EnumCell, "update": False},
        "execute_price": {"display": "执行价", "cell": EnumCell, "update": False},
        "create_time": {"display": "生成时间", "cell": TimeCell, "update": False},
        "trigger_time": {"display": "触发时间", "cell": TimeCell, "update": False},
        "status": {"display": "状态", "cell": EnumCell, "update": True},
        "strategy_name": {"display": "策略名称", "cell": BaseCell, "update": False},
    }

    def __init__(self,cta_engine : MyCtaEngine):
        """"""
        super().__init__(cta_engine.main_engine, cta_engine.event_engine)

        self.cta_engine = cta_engine

    def init_ui(self):
        """
        Connect signal.
        """
        super().init_ui()

        self.setToolTip("双击单元格可停止条件单")
        self.itemDoubleClicked.connect(self.stop_condition_order)

    def stop_condition_order(self, cell):
        """
        Stop algo if cell double clicked.
        """
        order = cell.get_data()
        if order:
            self.cta_engine.cancel_condition_order(order.cond_orderid)

2.2.3 加载条件单管理器

修改策略管理器StrategyManager的代码如下:

class StrategyManager(QtWidgets.QFrame):
    """
    Manager for a strategy
    """

    def __init__(
        self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict
    ):
        """"""
        super(StrategyManager, self).__init__()

        self.cta_manager = cta_manager
        self.cta_engine = cta_engine

        self.strategy_name = data["strategy_name"]
        self._data = data

        self.tradetool : TradingWidget = None                    # hxxjava add

        self.init_ui()

    def init_ui(self):
        """"""
        self.setFixedHeight(300)
        self.setFrameShape(self.Box)
        self.setLineWidth(1)

        self.init_button = QtWidgets.QPushButton("初始化")
        self.init_button.clicked.connect(self.init_strategy)

        self.start_button = QtWidgets.QPushButton("启动")
        self.start_button.clicked.connect(self.start_strategy)
        self.start_button.setEnabled(False)

        self.stop_button = QtWidgets.QPushButton("停止")
        self.stop_button.clicked.connect(self.stop_strategy)
        self.stop_button.setEnabled(False)

        self.trade_button = QtWidgets.QPushButton("交易")       # hxxjava add
        self.trade_button.clicked.connect(self.show_tradetool)  # hxxjava add
        self.trade_button.setEnabled(False)                      # hxxjava add

        self.edit_button = QtWidgets.QPushButton("编辑")
        self.edit_button.clicked.connect(self.edit_strategy)

        self.remove_button = QtWidgets.QPushButton("移除")
        self.remove_button.clicked.connect(self.remove_strategy)

        strategy_name = self._data["strategy_name"]
        vt_symbol = self._data["vt_symbol"]
        class_name = self._data["class_name"]
        author = self._data["author"]

        label_text = (
            f"{strategy_name}  -  {vt_symbol}  ({class_name} by {author})"
        )
        label = QtWidgets.QLabel(label_text)
        label.setAlignment(QtCore.Qt.AlignCenter)

        self.parameters_monitor = DataMonitor(self._data["parameters"])
        self.variables_monitor = DataMonitor(self._data["variables"])

        hbox = QtWidgets.QHBoxLayout()
        hbox.addWidget(self.init_button)
        hbox.addWidget(self.start_button)
        hbox.addWidget(self.trade_button)      # hxxjava add
        hbox.addWidget(self.stop_button)
        hbox.addWidget(self.edit_button)
        hbox.addWidget(self.remove_button)

        # hxxjava change to self.vbox,old is vbox
        self.vbox = QtWidgets.QVBoxLayout()     
        self.vbox.addWidget(label)
        self.vbox.addLayout(hbox)
        self.vbox.addWidget(self.parameters_monitor)
        self.vbox.addWidget(self.variables_monitor)
        self.setLayout(self.vbox)

    def update_data(self, data: dict):
        """"""
        self._data = data

        self.parameters_monitor.update_data(data["parameters"])
        self.variables_monitor.update_data(data["variables"])

        # Update button status
        variables = data["variables"]
        inited = variables["inited"]
        trading = variables["trading"]

        if not inited:
            return
        self.init_button.setEnabled(False)

        if trading:
            self.start_button.setEnabled(False)
            self.trade_button.setEnabled(True)  # hxxjava
            self.stop_button.setEnabled(True)
            self.edit_button.setEnabled(False)
            self.remove_button.setEnabled(False)
        else:
            self.start_button.setEnabled(True)
            self.trade_button.setEnabled(False)  # hxxjava
            self.stop_button.setEnabled(False)
            self.edit_button.setEnabled(True)
            self.remove_button.setEnabled(True)

    def init_strategy(self):
        """"""
        self.cta_engine.init_strategy(self.strategy_name)

    def start_strategy(self):
        """"""
        self.cta_engine.start_strategy(self.strategy_name)

    def show_tradetool(self):   # hxxjava add
        """ 为策略显示交易工具 """
        if not self.tradetool:
            strategy = self.cta_engine.strategies.get(self.strategy_name,None)
            if strategy and strategy.trading:
                self.tradetool = TradingWidget(strategy,self.cta_engine.event_engine)
                self.vbox.addWidget(self.tradetool)

        else:
            is_visible = self.tradetool.isVisible()
            self.tradetool.setVisible(not is_visible)

    def stop_strategy(self):
        """"""
        self.cta_engine.stop_strategy(self.strategy_name)

    def edit_strategy(self):
        """"""
        strategy_name = self._data["strategy_name"]

        parameters = self.cta_engine.get_strategy_parameters(strategy_name)
        editor = SettingEditor(parameters, strategy_name=strategy_name)
        n = editor.exec_()

        if n == editor.Accepted:
            setting = editor.get_setting()
            self.cta_engine.edit_strategy(strategy_name, setting)

    def remove_strategy(self):
        """"""
        result = self.cta_engine.remove_strategy(self.strategy_name)

        # Only remove strategy gui manager if it has been removed from engine
        if result:
            self.cta_manager.remove_strategy(self.strategy_name)

2.2.4 交易组件

创建vnpy\usertools\trading_widget.py文件,其中内容:

""" 
条件单交易组件 
作者:hxxjava
日线:2022-5-10
"""
from vnpy.trader.ui import QtCore, QtWidgets, QtGui

from vnpy.trader.constant import Direction,Offset
from vnpy.trader.event import EVENT_TICK
from vnpy.event.engine import Event,EventEngine 
from vnpy_ctastrategy.base import Condition,CondOrderStatus,ExecutePrice,ConditionOrder
from vnpy_ctastrategy.template import CtaTemplate


class TradingWidget(QtWidgets.QWidget):
    """
    CTA strategy manual trading widget.
    """

    signal_tick = QtCore.pyqtSignal(Event)

    def __init__(self, strategy: CtaTemplate, event_engine: EventEngine):
        """"""
        super().__init__()

        self.strategy: CtaTemplate = strategy
        self.event_engine: EventEngine = event_engine

        self.vt_symbol: str = strategy.vt_symbol
        self.price_digits: int = 0

        self.init_ui()
        self.register_event()

    def init_ui(self) -> None:
        """"""
        # 交易方向:多/空
        self.direction_combo = QtWidgets.QComboBox()
        self.direction_combo.addItems(
            [Direction.LONG.value, Direction.SHORT.value])

        # 开平选择:开/平
        self.offset_combo = QtWidgets.QComboBox()
        self.offset_combo.addItems([offset.value for offset in Offset])

        # 条件类型
        conditions = [Condition.BE,Condition.LE,Condition.BT,Condition.LT]
        self.condition_combo = QtWidgets.QComboBox()
        self.condition_combo.addItems(
            [condition.value for condition in conditions])

        double_validator = QtGui.QDoubleValidator()
        double_validator.setBottom(0)

        self.price_line = QtWidgets.QLineEdit()
        self.price_line.setValidator(double_validator)

        self.exit_line = QtWidgets.QLineEdit()
        self.exit_line.setValidator(double_validator)

        self.volume_line = QtWidgets.QLineEdit()
        self.volume_line.setValidator(double_validator)

        self.price_check = QtWidgets.QCheckBox()
        self.price_check.setToolTip("设置价格随行情更新")

        execute_prices = [ExecutePrice.SETPRICE,ExecutePrice.MARKET,ExecutePrice.EXTREME]
        self.execute_price_combo = QtWidgets.QComboBox()
        self.execute_price_combo.addItems(
            [execute_price.value for execute_price in execute_prices])

        send_button = QtWidgets.QPushButton("发出")
        send_button.clicked.connect(self.send_condition_order)

        hbox = QtWidgets.QHBoxLayout()
        hbox.addWidget(QtWidgets.QLabel(f"合约:{self.vt_symbol}"))
        hbox.addWidget(QtWidgets.QLabel("方向"))
        hbox.addWidget(self.direction_combo)
        hbox.addWidget(QtWidgets.QLabel("开平"))
        hbox.addWidget(self.offset_combo)
        hbox.addWidget(QtWidgets.QLabel("条件"))
        hbox.addWidget(self.condition_combo)
        hbox.addWidget(QtWidgets.QLabel("触发价"))
        hbox.addWidget(self.price_line)
        hbox.addWidget(self.price_check)
        hbox.addWidget(QtWidgets.QLabel("数量"))
        hbox.addWidget(self.volume_line)
        hbox.addWidget(QtWidgets.QLabel("执行价"))
        hbox.addWidget(self.execute_price_combo)

        hbox.addWidget(send_button)

        # Overall layout
        self.setLayout(hbox)

    def register_event(self) -> None:
        """"""
        self.signal_tick.connect(self.process_tick_event)
        self.event_engine.register(EVENT_TICK, self.signal_tick.emit)

    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data
        if tick.vt_symbol != self.vt_symbol:
            return

        if self.price_check.isChecked():
            self.price_line.setText(f"{tick.last_price}")

    def send_condition_order(self) -> bool:
        """
        Send new order manually.
        """
        try:
            direction = Direction(self.direction_combo.currentText())
            offset = Offset(self.offset_combo.currentText())
            condition = Condition(self.condition_combo.currentText())
            price = float(self.price_line.text())               
            volume = float(self.volume_line.text())
            execute_price = ExecutePrice(self.execute_price_combo.currentText())

            order = ConditionOrder(
                strategy_name = self.strategy.strategy_name,
                vt_symbol=self.vt_symbol,
                direction=direction,
                offset=offset,
                price=price,
                volume=volume,
                condition=condition,
                execute_price=execute_price
            )

            self.strategy.send_condition_order(order=order)

            print(f"发出条件单 : vt_symbol={self.vt_symbol},success ! {order}")
            return True

        except:
            print(f"发出条件单 : vt_symbol={self.vt_symbol},input error !")  
            return False

2.3 有条件单功能的CTA引擎——MyCtaEngine

2.3.1 MyCtaEngine的实现

在vnpy_ctastrategy\engine.py中对CtaEngine进行如下扩展:

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}     # strategy_name: dict

    def load_active_condtion_orders(self):
        """  """
        return {}

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED

                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED

                self.call_strategy_func(strategy,strategy.on_condition_order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

2.3.2 更换CtaEngine

对vnpy_ctastrategy__init__.py中的CtaTemplate进行如下修改:

from .engine import MyCtaEngine   # hxxjava add
class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

2.3.3 对CtaTemplate进行扩展

对vnpy_ctastrategy\template.py中的CtaTemplate进行如下扩展:

    @virtual
    def on_condition_order(self, cond_order: ConditionOrder):
        """
        Callback of condition order update.
        """
        pass

    def send_condition_order(self,order:ConditionOrder):    # hxxjava add
        """ """
        if not self.trading:
            return False
        return self.cta_engine.send_condition_order(order)

    def cancel_condition_order(self,cond_orderid:str):      # hxxjava add
        """ """
        return self.cta_engine.cancel_condition_order(cond_orderid)

    def cancel_all_condition_orders(self):                  # hxxjava add
        """ """
        return self.cta_engine.cancel_all_condition_orders(self)

2.3.4 CTA用户策略中如何使用条件单功能

1)CTA策略中的条件单被触发点回调通知:

    def on_condition_order(self, cond_order: ConditionOrder):
        """
        Callback of condition order update.
        """
        print(f"条件单已经执行,cond_order = {cond_order}")

2)发起条件单

    cond_order = ConditionOrder(... ...)
    self.send_condition_order(cond_order)

3)取消条件单

    self.cancel_condition_order(cond_orderid)

4)取消策略的所有条件单

    self.cancel_all_condition_orders()

3. 条件单应该在vnpy系统中被广泛使用

  • 它应该运行在整个vnpy系统的底层,为各类的应用策略提供委托支持,
  • 对连续密集触发点条件单实施流控限制,避免交易接口出现丢单的流控问题
  • 各类应用的引擎应该提供send_condition_order()接口,实现条件单到不同应用委托执行逻辑
  • 各类应用的模板应该提供on_condition_order回调,解决条件单触发后对不同类型用户策略的触发通知
  • 用户策略尽量使用条件单来执行交易,避免直接执行来自接口的委托函数

1. vnpy系统提供的K线图表的缺点

如果您在启动vntrader的时候勾选了【ChartWizard 实时K线图表模块】,您会简单主界面上vnpy系统提供的K线图表功能图标,进入该功能模块后就可以输入本地代码,新建K线图表了。
使用了该功能之后,你会发现它有如下缺点:

  • 这个K线图表只能提供一分钟的K线图表
  • 除了K线主图和成交量之外,你不可以增加其他的主图附加指标和副图指标

这样一个太简单的K线图表是远远满足了交易者对K线图表的需求的,有多少人使用就可想而知了。

2. 它应该提供不同周期单位和窗口大小的K线显示能力

绝大多数交易策略都是基于K线来实现的。可是很少部分是只在1分钟K线的基础上运行的,可能是n分钟,n小时,n天...,只能提供一分钟的K线图是不够用的。
所以应该提供用户如下的选择:

  • 窗口大小
  • 单位选择

3. 它应该提供其他的主图附加指标和副图指标的添加和删除功能

用户之所以想看K线图,可能是想看看自己策略的算法是否正确,这一般都是使用了一个或者多个运行在窗口K线上指标计算的值计算的入场和出场信号。
这也是可以显示的,而这种指标不可能全部是系统自带的指标显示控件能够涵盖的,所以应该有方法让用户自己增加自己的指标显示部件。
所以应该提供下面功能:

  • 更改主图指标功能
  • 增加/删除主图附加指标功能
  • 增加/删除副图指标功能
© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】