VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

ranjianlin wrote:

为什么不在停止单的基础上来修改了?增加执行价格,设定价、市场价或者极限价呢? 这样会不会简单一点。
答复:
您细细比较停止单和我的条件单的区别就知道为什么了。

  1. 在没有条件单的情况下,想买的价格比盘口价低会等待,可以使用停止单来实现;想想买的价格比盘口价高不会等待,立即执行。
  2. 在有条件单的情况下,想买的价格比盘口价低会等待,可以使用停止单来实现;想想买的价格比盘口价高也会等待。

哈哈哈哈baab695eb19449d0 wrote:

hxxjava老哥你好,

最近面临代码管理的问题:

直接修改源代码怕不能更新;
将文件复制到出来额外编写会有导包比较乱,以及编辑器无法定位代码的问题;
重写覆盖会有多文件引用需要全部重命名的问题.

想向你请教一下,你的系统改动这么多,代码是怎么管理的?

用git吧。

下载zeal

zeal的官网网址:https://zealdocs.org/
description

选择属于自己需要的平台

Windows,Linux和BSD各种平台,选择属于自己需要的平台。我选择的是免安装的Portable的压缩包:

description

windows平台的Portable压缩包下载之后无需安装,所以设置和文档都存放在应用目录下,比较方便。下载之后直接解压到指定目录下,找到运行其中zeal.exe就可以了。

运行zeal

设置离线文档存放目录

description

下载离线文档

description

已经下载的离线文档

description

打开离线文档,查询感兴趣的主题

浏览离线文档既可以在zeal内部浏览器显示感兴趣的主题,也可以选择外边浏览器。至此你再也不用为查询python语法、函数解释和用法而烦恼了。

description

黄sir wrote:

大佬请教两个问题:
1、是不是开市的时候,服务器才会返回相应的值
2、向远程服务器询问保证金和手续费的时候,不是每次询问都会有返回的,需要多次调用对吗?

答复:
只要交易服务器登录成功,每次询问保证金和手续费都会有返回的,无需在交易时段。
但只能够单个品种的查询,可能返回多个结果(单个品种的或者品种+单个合约的)。
而且在交易时段查询,询问保证金和手续费会占用上行流控,如果查询太多太密集,
可能影响委托申请的响应,因为上行流控每秒最多6次。

1、vnpy系统缺少音乐和语音播放功能

vnpy中多采用各种应用系统的策略进行交易的,虽然也有各种日志和提示出现,但平常总是静悄悄的。
如果你想了解系统和策略的运行情况,可以查看各种运行日志,例如MainWindow的日志,委托列表,成交的列表,账户列表。想要查询你的策略运行情况,可以查看你的策略管理器的变量输出,等等。可是人总不能一直盯着屏幕,那样太累了。
如果能够有个声音和语音播报各种交易活动,用户会及时得到提醒。例如:

  • 当网络连接和断开时可以提醒特定的音乐或语音,可以让你及时处理网络故障;
  • 用户策略中可以添加音乐或语音,可以提醒策略的交易所发生的交易活动是否正常;
  • 当你的账户资金出现保证金不足,可以提醒及时入金;
  • 如果逆勢日内交易,你可以设置提前3分钟提醒你即将收盘了。

2、实现方法

2.1 音乐和语音播放器的实现

在vnpy\usertools目录下添加文件sound_player.py,内容如下:

"""
多线程音乐和文本播放器,介绍如下:
特点:
    既可以播放wav,mp3 等格式,有可以对文本进行播放。
    使用消息引擎创建该音乐播放器,多线程并行播放声音,不会因为播放声音而阻塞业务流程。
    假设其他应用或者策略中需要播放声音, sound_name为字符型的声音文件名称,使用方法有两种:   
    方法1:  先获取消息引擎event_engine(注:与SoundPlayer实例成交时使用的消息引擎是
            相同的),那么可以这样播放:

            event_engine.put(Event(EVENT_SOUND,"Connected.wav"))
            event_engine.put(Event(EVENT_SPEEK,"您收到一条委托单"))

    方法2:  将SoundPlayer多play_sound()接口安装到MainEngine到实例main_engine,那么
            可以先回去获取main_engine,然后这样播放:

            event_engine.play_sound("Connected.wav")
            event_engine.speek_text("您收到一条委托单")

作者:hxxjava    时间:2023-2-14,情人节————献给心爱的人!

修改:增加回测与实盘的区分功能,使得只在实盘环境才播放声音和文本。
修改:hxxjava    时间:2023-2-28

依赖库:pyttsx3, 安装:pip install pyttsx3
"""
from typing import Any
from pathlib import Path
from threading import Thread
from vnpy.trader.engine import EventEngine,Event
from winsound import PlaySound,SND_FILENAME
import pyttsx3

EVENT_SOUND = "eSound."
EVENT_SPEEK = "eSpeak."

class SoundPlayer():
    """
    多线程声音播放器
    """
    def __new__(cls, *args, **kwargs):
        """ singleton constructor """
        if not hasattr(cls, "_instance"):
            cls._instance = super(SoundPlayer, cls).__new__(cls)
        return cls._instance

    def __init__(self,event_engine:EventEngine,switch:bool=True):
        """ 初始化函数 """
        self.event_engine = event_engine
        # control play sound file
        self.switch = switch   
        self.register_event()

    def register_event(self):
        """ """
        self.event_engine.register(EVENT_SOUND,self.process_sound_event)
        self.event_engine.register(EVENT_SPEEK,self.process_speak_event)

    def set_switch(self,switch:bool=True):
        """ set the swith which control play sound file """
        self.switch = switch

    def _get_sound_path(self,sound_name: str):
        """
        Get path for sound file with sound name.
        """
        this_file_path:Path = Path(__file__).parent
        sound_path:Path = this_file_path.joinpath("sounds", sound_name)
        return str(sound_path)

    def process_sound_event(self,event:Event):
        """ EVENT消息处理过程 """
        wavname,is_testing = event.data['wavname'],event.data['is_testing']
        if self.switch == True and is_testing == False:
            filename = self._get_sound_path(wavname) 
            thread = Thread(target=self._play_sound,kwargs=({"filename":filename}),daemon=True)     
            thread.start()  

    def process_speak_event(self,event:Event):
        """ EVENT消息处理过程 """
        santence,is_testing = event.data['santence'],event.data['is_testing']
        if self.switch == True and is_testing == False:
            santence:str = event.data
            thread = Thread(target=self._do_speak,kwargs=({"santence":santence}),daemon=True)     
            thread.start()  

    def _play_sound(self,filename:str):
        """ 音乐文件播放线程执行过程 """
        PlaySound(filename,SND_FILENAME)

    def _do_speak(self,santence:str):
        """ 文本播放线程执行过程 """
        print(santence)
        speaker = pyttsx3.init()
        speaker.say(santence)
        speaker.runAndWait()

    def play_sound(self,sound_name:str,is_testing:bool=False):
        """ 
        用户音乐播放接口。
        参数:
            sound_name:传入声音文件名 
            is_testing:回测=True;实盘=False(默认)
        """
        self.event_engine.put(Event(EVENT_SOUND,{"wavname":sound_name,"is_testing":is_testing}))

    def speak_text(self,santence:str,is_testing:bool=False):
        """ 
        用户文字播放接口。
        参数:
            santence:传入声音文件名 
            is_testing:回测=True;实盘=False(默认)
        """
        self.event_engine.put(Event(EVENT_SPEEK,{"santence":santence,"is_testing":is_testing}))

2.2 把音乐和语音播放器安装到vnpy系统

在vnpy\trader\engine.py中做如下修改:

1)在引用部分添加这些内容

from vnpy.usertools.sound_player import SoundPlayer

2)在class OmsEngine的init()中创建音乐和语音播放器

      self.sound_player = SoundPlayer(event_engine,True)   # test sound player

3)在class OmsEngine的add_function()函数中为MainEngine添加下面的函数

    def add_function(self) -> None:
        """Add query function to main engine."""
         ... ...
        self.main_engine.play_sound = self.sound_player.play_sound
        self.main_engine.speak_text = self.sound_player.speak_text

这样你的MainEngine就有了可以音乐和语音功能了。

2.3 音频文件存放在哪里?

class sound_player规定音频文件存放在vnpy\usertools\sounds\目录下,当然你也可以修改代码中规定的目录,放在自己喜欢的目录下。
文件可以是wav、mp3格式的音乐文件均可,可以自己录制。
取一些有意义的文件名,如connected.wav代表网络连接成功,disconnection.wav代表网络断开,自己发挥吧,方便自己在自己vnpy系统中用函数调用。
本来本人有一套音乐文件的,可是论坛里没有文件上传功能,所以无法共享给大家,如果需要可以私信我。

3. 如何使用音乐和语音播放功能

下面用连接网关成功和连接断开,分别给出音乐和语音播放的示例:

3.1 音乐播放功能使用

3.1.1 通过main_engine调用play_sound()播放语音

    def process_connect_event(self, event: Event) -> None:  # hxxjava add
        """ CTP接口连接消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.play_sound("Connected.wav")

    def process_disconnect_event(self, event: Event) -> None: # hxxjava add
        """ CTP接口断开消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.play_sound("ConnectionLost.wav")

3.1.2 发送EVENT_SOUND消息播放音乐

# 增加引用
from vnpy.usertools.sound_player import EVENT_SOUND,EVENT_SPEEK

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        # 当策略收到委托单时播放提示音乐
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SOUND,
                      {"wavname":"order.wav","is_testing":is_testing}
                )
        event_engine.put(event)

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        # 当策略收到成交单时播放提示音乐
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SOUND, {"wavname":"traded.wav","is_testing":is_testing} )
        event_engine.put(event)

3.2 语音播放功能使用

3.2.1 通过main_engine调用speak_text()播放语音

    def process_connect_event(self, event: Event) -> None:  # hxxjava add
        """ CTP接口连接消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.speak_text(f"感谢您,连接{gateway.name}的{gateway.type}接口成功!")

    def process_disconnect_event(self, event: Event) -> None: # hxxjava add
        """ CTP接口断开消息处理 """
        gateway:GatewayData = event.data
        self.main_engine.speak_text(f"请注意:{gateway.name}的{gateway.type}接口已断开!")

3.2.2 发送EVENT_SPEEK消息播放语音

假设你的策略中实现了on_order()和on_trade()这两个回调函数:

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SPEEK,
                      {"santance":f"策略{self.strategy_name}收到委托单,价格{order.price},手数{order.volume},已经成交{order.traded}",
                       "is_testing":is_testing})
        event_engine.put(event)

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        event_engine:EventEngine = self.cta_engine.event_engine
        is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
        event = Event(EVENT_SPEEK,
                      {"santance":f"策略{self.strategy_name}收到{trade.vt_symbol}成交单,成交价{trade.price}手数{trade.volume}.",
                       "is_testing":is_testing})
        event_engine.put(event)

3.3 音乐和语音播放功能使用注意事项

实盘中用户策略是可以通过应用应用引擎获得vnpy系统的MainEngine的,这样就可以使用 play_sound() 和 speak_text()函数来播放音乐和语音了。但是,在策略中使用这个两个播放函数,应该考虑到回测时不要有声音的。应该根据应用引擎的不同,在策略中使用 play_sound() 和 speak_text()时,将参数is_testing设置为True,这样策略回测就不会有音乐和语音了。

4 将音乐和语音播放功能封装到CTA策略中

这里以CTA策略模板CtaTemplate为例,演示如何将音乐和语音播放功能封装到各种应用的策略中,其他应用系统的模板可以参考以下的做法去封装,就不再一一讲解。

# 在引用部分增加对音乐和语音播器的引用
from vnpy.usertools.sound_player import SoundPlayer

class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""

        ... ... # 原来的初始化代码 

        # 音乐和语音播放器 hxxjava add
        self.sound_player:SoundPlayer = SoundPlayer(self.cta_engine.event_engine)

    def play_sound(self,sound_name:str): 
        """ 播放音乐 hxxjava add """
        if self.cta_engine.get_engine_type() == EngineType.LIVE:
            self.sound_player.play_sound(sound_name)

    def speak_text(self,santence:str): 
        """ 播放语音 hxxjava add """
        if self.cta_engine.get_engine_type() == EngineType.LIVE:
            self.sound_player.speak_text(santence)

经过上面对CtaTemplate的修改,用户策略中就可以像下面的语句一样直接调用音乐和语音播放了,更加简便。

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化开始")
        self.load_bar(10)
        self.speak_text(f"策略{self.strategy_name}开始初始化")

fy758496805 wrote:

大佬
在过程中 出现了一个错误

File "E:\VNPY\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 276, in open_widget
2023-02-23 09:33:34 widget = widget_class(self.main_engine, self.event_engine)
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 100, in init
2023-02-23 09:33:35 self.init_ui()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 259, in init_ui
self.candle_dialog: CandleChartDialog = CandleChartDialog()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 1257, in init
self.init_ui()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 1270, in init_ui
self.trade_item: TradeItem = TradeItem(plot=candle_plot, manager=manager)
File "E:\VNPY\lib\site-packages\vnpy\usertools\chart_items.py", line 1041, in init
super().init(plot=plot, manager=manager, size=15, pxMode=True, pen=pg.mkPen(None),
File "E:\VNPY\lib\site-packages\vnpy\usertools\chart_items.py", line 980, in init
self.plot.addItem(self)
AttributeError: 'NoneType' object has no attribute 'addItem'

是不是继承类的时候有问题呀?

答复:

你好像在策略中用了K线图表了,但是需要区分是用在实盘还是回测环境,回测时显示ChartWidget给谁看呀?

解决方法

把你的策略这样写:

def __init__(self, ... ...): # 你的策略的初始化函数
        ... ...
        self.chart:CandleChartDialog = None
        ... ...

def on_start(self):  # 策略的启动函数
        is_live = self.cta_engine.get_engine_type() == EngineType.LIVE    
        if is_live and self.show_chart and not self.chart:
            # 实盘,配置了显示图表同时未成交的情况下,创建图表——测试环境不会创建图表
            self.chart = CandleChartDialog(
                                strategy = self,
                               ... ... # 你的参数
                        )
            self.chart.update_dir_history(self.bars)
            self.chart.show()

cabinet wrote:

请问大神,能否告知下,使用sqlite数据库如何修改代码呢,万分感谢!

答复:

都是用peewee写的,代码我都已经分享了,只需要换下数据源,一个是MySQL另外一个是SQLite而已。
如果不懂peewee可以网上搜下,先找例子在jupyter notebook中练练手,明白了才好修改vnpy_sqlite下的数据库。

不同的数据库提供的接口都是一样的,都是实现vnpy.trader.database里规定的接口。

1. 停止单、条件单存在的问题

CTA策略模块原来是有停止单的,本人后来又在添加了条件单功能。使用中很多用户反应有这样的问题,那就是策略已经发出过停止单或条件单,但是还未触发,但是因为某种原因策略被关机了,再次启动该策略时发现之前发出过停止单或条件单没有了,非常不方便。如果能够在策略再次启动的时候,把历史的停止单或条件单回复出来就好了。

2. 如何解决加载停止单、条件单

那么如何把历史的停止单或条件单回复出来呢?把策略运行时曾经发出的停止单或条件单保存到文件或者数据库,在策略再次启动时,从文件或者数据库读取出来,恢复到CTA策略管理器的停止单或条件单列表,让它们继续运行就可以了。
这就有选择的问题:

  1. 是否希望加载历史停止单或条件单,这是可以选择的,应该可以设置;
  2. 全部恢复历史停止单或条件单,还是止恢复仍然有效的,这也应该可以选择的;

3. 实现方法

3.1 先实现停止单字典和条件单字典的存取功能

包括如下:

  • 保存内存中的停止单字典到json文件
  • 从json文件读取停止单字典
  • 保存内存中的条件单字典到json文件
  • 从json文件读取条件单字典

在vnpy_ctastrategy命令下新建一个文件utitlity.py,其内容如下:

"""
实现停止单字典和条件单字典的存取功能,功能如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
作者:hxxjava       
时间:2023-2-12
"""
import json
from datetime import datetime
from vnpy.trader.constant import Direction,Offset
from vnpy_ctastrategy.base import (
    StopOrder,
    StopOrderStatus,
    ConditionOrder,
    Condition,
    ExecutePrice,
    CondOrderStatus,
)
from vnpy.trader.utility import get_file_path,get_folder_path,save_json

class StopOrderEncoder(json.JSONEncoder):
    """
    停止单相关类型的编码器————用来保存json文件
    """
    def default(self, obj):
        d = {}
        d['__class__'] = obj.__class__.__name__
        if isinstance(obj,Direction):
            d['_value_'] = obj.value
        elif isinstance(obj,Offset):
            d['_value_'] = obj.value
        elif isinstance(obj,StopOrderStatus):
            d['_value_'] = obj.value
        elif isinstance(obj, datetime):
            d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
        elif isinstance(obj,StopOrder):
            d.update(obj.__dict__)
        else:
            d['__module__'] = obj.__module__
            d.update(obj.__dict__)            
        return d

class StopOrderDecoder(json.JSONDecoder):
    """
    停止单相关类型的译码器————用来从json文件读取
    """
    def __init__(self):
        json.JSONDecoder.__init__(self, object_hook=self.dict2obj)

    def dict2obj(self, d):
        if '__class__' in d:       
            class_name = d.pop('__class__')
            if class_name == 'Direction':
                value = d['_value_']
                instance = Direction(value)
            elif class_name == 'Offset':
                value = d['_value_']
                instance = Offset(value)
            elif class_name == 'StopOrderStatus':
                value = d['_value_']
                instance = StopOrderStatus(value)
            elif class_name == 'datetime':
                value = d['_value_']
                instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
            elif class_name == 'StopOrder':
                instance = StopOrder(**d)
            else:    
                module_name = d.pop('__module__')
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                args = dict((key,value) for key, value in d.items())
                instance = class_(**args)
        else:

            instance = d
        return instance

class CondOrderEncoder(json.JSONEncoder):
    """
    条件单相关类型的编码器————用来保存json文件
    """
    def default(self, obj):
        d = {}
        d['__class__'] = obj.__class__.__name__
        if isinstance(obj,Direction):
            d['_value_'] = obj.value
        elif isinstance(obj,Offset):
            d['_value_'] = obj.value
        elif isinstance(obj,Condition):
            d['_value_'] = obj.value
        elif isinstance(obj,ExecutePrice):
            d['_value_'] = obj.value
        elif isinstance(obj,CondOrderStatus):
            d['_value_'] = obj.value
        elif isinstance(obj, datetime):
            d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
        elif isinstance(obj,ConditionOrder):
            d.update(obj.__dict__)
        else:
            d['__module__'] = obj.__module__
            d.update(obj.__dict__)            
        return d

class CondOrderDecoder(json.JSONDecoder):
    """
    条件单相关类型的译码器————用来从json文件读取
    """
    def __init__(self):
        json.JSONDecoder.__init__(self, object_hook=self.dict2obj)

    def dict2obj(self, d):
        if '__class__' in d:       
            class_name = d.pop('__class__')
            if class_name == 'Direction':
                value = d['_value_']
                instance = Direction(value)
            elif class_name == 'Offset':
                value = d['_value_']
                instance = Offset(value)
            elif class_name == 'Condition':
                value = d['_value_']
                instance = Condition(value)
            elif class_name == 'ExecutePrice':
                value = d['_value_']
                instance = ExecutePrice(value)
            elif class_name == 'CondOrderStatus':
                value = d['_value_']
                instance = CondOrderStatus(value)
            elif class_name == 'datetime':
                value = d['_value_']
                instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
            elif class_name == 'ConditionOrder':
                instance = ConditionOrder(**d)
            else:    
                module_name = d.pop('__module__')
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                args = dict((key,value) for key, value in d.items())
                instance = class_(**args)
        else:

            instance = d
        return instance

def save_stop_order(filename: str,data: dict) -> None:
    """
    Save StopOrder dict into {.vntrader}\stop_orders\{filename}.json.
    """
    path = get_folder_path("stop_orders")
    path_file = path.joinpath(filename)
    with open(path_file, mode="w+",encoding="UTF-8") as f:
        json.dump(
            data,
            f,
            indent=4,
            cls=StopOrderEncoder,
            ensure_ascii=False
        )

def load_stop_order(filename: str) -> dict:
    """
    Load StopOrder dict from {.vntrader}\stop_orders\{filename}.json.
    """
    path = get_folder_path("stop_orders")
    path_file = path.joinpath(filename)
    filepath: Path = get_file_path(path_file)
    if filepath.exists():
        with open(filepath, mode="r", encoding="UTF-8") as f:
            data: dict = json.load(f,cls=StopOrderDecoder)
        return data
    else:
        save_json(filepath, {})
        return {}

def save_condition_order(filename: str,data: dict) -> None:
    """
    Save ConditionOrder dict into {.vntrader}\cond_orders\{filename}.json.
    """
    path = get_folder_path("cond_orders")
    path_file = path.joinpath(filename)
    with open(path_file, mode="w+",encoding="UTF-8") as f:
        json.dump(
            data,
            f,
            indent=4,
            cls=CondOrderEncoder,
            ensure_ascii=False
        )

def load_condition_order(filename: str) -> dict:
    """
    Load ConditionOrder dict from {.vntrader}\cond_orders\{filename}.json.
    """
    path = get_folder_path("cond_orders")
    path_file = path.joinpath(filename)
    filepath: Path = get_file_path(path_file)
    if filepath.exists():
        with open(filepath, mode="r", encoding="UTF-8") as f:
            data: dict = json.load(f,cls=CondOrderDecoder)
        return data
    else:
        save_json(filepath, {})
        return {}

3.2 为CtaEngine增加停止单和条件单保存和加载功能

在文件cta_strategy\engine.py的class MyCtaEngine下面增加下面的代码。class MyCtaEngine已经在比停止单更好用的条件单——ConditionOrder一文中分享给大家来,虽然这次贴出其完整代码,但这里只介绍与停止单和条件单的保存与恢复有关的内容。

  • 增加了save_stop_orders()接口,用来保存策略停止单
  • 增加了load_stop_orders()接口,用来回复策略停止单
  • 增加了save_condition_orders()接口,用来保存策略条件单
  • 增加了load_condition_orders()接口,用来回复策略条件单
  • 增加了对EVENT_TIMER消息的订阅,其消息处理函数process_timer_event()用来定时对所以初始化的策略进行监视,保存其停止单和条件单到以其策略名称为文件名的json文件。

在cta_strategy\engine.py的引用部分增加这些代码:

from .utility import save_stop_order, load_stop_order, save_condition_order, load_condition_order   # hxxjava add

class MyCtaEngine下面增加下面的代码:

class MyCtaEngine(CtaEngine):
    """ 
    CTA策略引擎,对CtaEngine的功能进行扩展。 
    功能:
    1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
    2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
    3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
    4. 定时保存已经初始化策略的停止单和条件单到json文件。
    5. 提供历史策略的停止单和条件单的查询接口。
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

        self.seconds = 0
        self.save_orders_interval = 10

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
        self.event_engine.register(EVENT_STATUS, self.process_status_event)
        self.event_engine.register(EVENT_ALL_PENDING_ORDER, self.process_pending_order_event)
        self.event_engine.register(EVENT_TIMER, self.process_timer_event)

    def process_pending_order_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """
        pending_orders:PendingOrders = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map.get(pending_orders.vt_symbol,[])
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行全挂单消息推送
                self.call_strategy_func(strategy, strategy.on_pending_orders, pending_orders)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """
        tick:TickData = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map.get(tick.vt_symbol,[])
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_status_event(self,event:Event):
        """ 交易状态消息处理 hxxjava add """
        status:StatusData = event.data
        strategies:List[CtaTemplate] = []

        # step1: find strategies related to this status data 
        vt_instrument0 = get_vt_instrument(status.vt_symbol)
        if vt_instrument0 == status.vt_symbol:
            # 交易品种的交易状态
            for vt_symbol in self.symbol_strategy_map.keys():
                vt_instrument = get_vt_instrument(vt_symbol)
                if vt_instrument == vt_instrument0:
                    # 交易品种的交易状态属于策略交易的合约
                    strategies.extend(self.symbol_strategy_map[vt_symbol]) 

        else:
            # 单独合约的交易状态
            strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))

        if not strategies:
            return

        # step 2: push status data to all relate strategies
        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_status, status)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.put_cond_order_event(order)

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.put_cond_order_event(order)
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.put_cond_order_event(order)

        return True

    def put_cond_order_event(self, cond_order: ConditionOrder) -> None:
        """
        Put an event to update condition order status.
        """
        event: Event = Event(EVENT_CONDITION_ORDER, cond_order)
        self.event_engine.put(event)

    def save_stop_orders(self,strategy_name:str,active_only:bool=False):
        """ 保存加载历史停止单到json文件 """
        count:int = 0
        strategy:CtaTemplate = self.strategies.get(strategy_name,None)
        if not strategy:
            return count

        stop_orders = {}
        for order_id,stop_order in self.stop_orders.items():
            if stop_order.strategy_name == strategy_name:
                if active_only and stop_order.status == StopOrderStatus.WAITING:
                    stop_orders[order_id] = stop_order
                    count += 1
                else:
                    stop_orders[order_id] = stop_order
                    count += 1
        file_name = f"{strategy_name}.json"
        save_stop_order(file_name,stop_orders)
        return count

    def load_stop_orders(self,strategy_name:str,active_only:bool=True):
        """ 从json文件加载历史停止单 """
        file_name = f"{strategy_name}.json"
        stop_orders:Dict[str,StopOrder] = load_stop_order(file_name)
        if not stop_orders:
            return

        if not active_only:
            loaded_orders = stop_orders
        else:
            loaded_orders = {}
            for id,stop_order in stop_orders.items():
                if stop_order.status == StopOrderStatus.WAITING:
                    loaded_orders[id] = stop_order

        if loaded_orders:
            self.stop_orders.update(loaded_orders)
            # 更新GUI中加载的停止单列表
            for stop_order in loaded_orders.values():
                self.put_stop_order_event(stop_order)

    def save_condition_orders(self,strategy_name:str,active_only:bool=True):
        """ 保存加载历史条件单到json文件 """
        count:int = 0
        strategy:CtaTemplate = self.strategies.get(strategy_name,None)
        if not strategy:
            return count

        cond_orders = {}
        for order_id,cond_order in self.condition_orders.items():
            if cond_order.strategy_name == strategy_name:
                if active_only and cond_order.status == CondOrderStatus.WAITING:
                    cond_orders[order_id] = cond_order
                    count += 1
                else:
                    cond_orders[order_id] = cond_order
                    count += 1
        file_name = f"{strategy_name}.json"
        save_condition_order(file_name,cond_orders)
        return count

    def load_condition_orders(self,strategy_name:str,active_only:bool=True):
        """ 从json文件加载历史条件单 """
        file_name = f"{strategy_name}.json"
        cond_orders:Dict[str,ConditionOrder] = load_condition_order(file_name)
        if not cond_orders:
            return

        if not active_only:
            loaded_orders = cond_orders
        else:
            loaded_orders = {}
            for id,cond_order in cond_orders.items():
                if cond_order.status == CondOrderStatus.WAITING:
                    loaded_orders[id] = cond_order

        if loaded_orders:
            self.condition_orders.update(loaded_orders)
            # 更新GUI中加载的条件单列表
            for cond_order in loaded_orders.values():
                self.put_cond_order_event(cond_order)

    def process_timer_event(self,event:Event) -> None:
        # 定时保存策略的
        self.seconds += 1
        if self.seconds % self.save_orders_interval:
            return

        if self.get_engine_type() != EngineType.LIVE:
            # 只有实盘引擎才保存停止单和条件单,回测引擎则不保存
            return

        for strategy in self.strategies.values():
            if strategy.inited:
                cnt1 = self.save_stop_orders(strategy.strategy_name)
                cnt2 = self.save_condition_orders(strategy.strategy_name)
                # print(f"保存了策略 {strategy.strategy_name} 的 {cnt1} 个停止单,{cnt2} 个条件。")

3.3 为CTA策略模板CtaTemplate增加停止单和条件单加载选项

这里主要介绍在CtaTemplate中增加点与加载历史停止单和条件单相关的成员变量:
其中:

  • self.history_order表示策略是否在启动之后加载历史停止单和条件单,
  • self.active_only表示是否只加载仍然处于等待状态的历史停止单和条件单;

class CtaTemplate的其他代码见本人之前的帖子中的代码:比停止单更好用的条件单——ConditionOrder

class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""
        self.cta_engine: Any = cta_engine
        self.strategy_name: str = strategy_name
        self.vt_symbol: str = vt_symbol

        self.inited: bool = False
        self.trading: bool = False
        self.pos: int = 0

        # 是否在启动之后加载历史停止单和条件单
        self.history_order:bool = False  # hxxjava add
        self.active_only:bool = False  # hxxjava add

        # Copy a new variables list here to avoid duplicate insert when multiple
        # strategy instances are created with the same strategy class.
        self.variables = copy(self.variables)
        self.variables.insert(0, "inited")
        self.variables.insert(1, "trading")
        self.variables.insert(2, "pos")

        self.update_setting(setting)

       # 其他代码省略 ... ...

4. 加载停止单和条件单委托选项的应用

4.1 用户CTA策略如何使用停止单和条件单加载功能?

增加历史停止单和条件单委托选项

class DemoStrategy(CtaTemplate):
    """ 示例策略 """

    author = "hxxjava"

    capital : float = 200000.0  # 交易资金
    max_loss_ratio : int = 6         # 每次开仓最大亏损比例
    max_open_times : int = 3         # 每次开仓最大亏损比例

    dir_interval : str = '1m'   # 方向周期单位,只能够是:'1m','1h','d'或'w'中的一个
    dir_window : int = 30       # 方向周期窗口
    op_interval : str = '1m'    # 操作周期单位,只能够是:'1m','1h','d'或'w'中的一个
    op_window : int = 3         # 操作周期窗口
    load_days : int = 10        # 加载历史行情的天数
    OpenSelect:str = "逆转价"
    show_chart: bool = True     # 是否需要显示图表
    dir_trend: str = ""
    op_trend: str = ""


    parameters = [
        "capital",     
        "max_loss_ratio",   
        "max_open_times",   
        "dir_interval",
        "dir_window",
        "op_interval",
        "op_window",
        "load_days",
        "OpenSelect",
        "show_chart",
        "history_order",    # 启动时是否加载停止单和条件单选项
        "active_only",       # 是否只加载仍然有效的停止单和条件单选项,
    ]

    long_pos:float = 0  # 持有多仓
    short_pos:float = 0 # 持有空仓

    variables = [
        "dir_trend",
        "op_trend",
        "long_pos",
        "short_pos"
    ]

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)

        self.history_order = True  # 启动时载历史停止单和条件单
        self.active_only = False  # 加载又有点历史停止单和条件单

       # 其他的代码省略
    def on_start(self):
        """
        Callback when strategy is started.
        """
        is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
        if is_live and self.history_order:
            # 只有实盘才保存策略的历史停止单和条件单
            from vnpy_ctastrategy.engine import MyCtaEngine
            cta_engine:MyCtaEngine = self.cta_engine
            cta_engine.load_stop_orders(self.strategy_name,self.active_only)
            cta_engine.load_condition_orders(self.strategy_name,self.active_only)
            self.write_log("加载历史停止单和条件单已执行。")

    # 其他的代码省略

4.2 设置加载停止单和条件单委托选项

在用户策略未启动的情况下,还可以设置有关加载停止单和条件单委托的选项,如图所示:

description

4.3 停止单和条件单会在用户CTA策略启动之后立即加载

下图是策略执行了条件单之后被关闭,再次重新启动之后回复的条件单。当然,停止单也是可以的实现恢复历史的,大家可以去试。
description

4.4 保存的停止单和条件单在哪里,内容是什么样子?

以条件单为例,如下图所示,条件单通常保存在用户目录下的.vntrader\cond_orders{策略名称}.json文件中:

description

其内容及格式如下:

{
    "0213082700975": {
        "__class__": "ConditionOrder",
        "strategy_name": "gs-rb2305",
        "vt_symbol": "rb2305.SHFE",
        "direction": {
            "__class__": "Direction",
            "_value_": "多"
        },
        "offset": {
            "__class__": "Offset",
            "_value_": "开"
        },
        "price": 4072.0,
        "volume": 4.0,
        "condition": {
            "__class__": "Condition",
            "_value_": ">"
        },
        "execute_price": {
            "__class__": "ExecutePrice",
            "_value_": "设定价"
        },
        "create_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 08:27:00.975422"
        },
        "trigger_time": null,
        "cond_orderid": "0213082700975",
        "traded": 0.0,
        "vt_orderids": [],
        "status": {
            "__class__": "CondOrderStatus",
            "_value_": "已撤销"
        },
        "before_trigger": null,
        "after_traded": null
    },
    "0213090325941": {
        "__class__": "ConditionOrder",
        "strategy_name": "gs-rb2305",
        "vt_symbol": "rb2305.SHFE",
        "direction": {
            "__class__": "Direction",
            "_value_": "空"
        },
        "offset": {
            "__class__": "Offset",
            "_value_": "开"
        },
        "price": 4063.0,
        "volume": 4.0,
        "condition": {
            "__class__": "Condition",
            "_value_": "<="
        },
        "execute_price": {
            "__class__": "ExecutePrice",
            "_value_": "极限价"
        },
        "create_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 09:03:25.941102"
        },
        "trigger_time": {
            "__class__": "datetime",
            "_value_": "2023-02-13 09:03:25.500000"
        },
        "cond_orderid": "0213090325941",
        "traded": 0.0,
        "vt_orderids": [
            "CTP.12_-2076558284_1"
        ],
        "status": {
            "__class__": "CondOrderStatus",
            "_value_": "已触发"
        },
        "before_trigger": null,
        "after_traded": null
    }
}

1. 为什么要隐藏与显示K线图表的信息板?

K线图表中主图与副图都有信息板,当我们用选择某个K线时,它用文字的形式表达每个K线修改的信息以及指标数值,它很有用。但是有时因为主图或者副图的指标过多,抑或是显示的信息过多,它会覆盖主图或副图的左上角或者右上角开始的很大一片区域,很影响我们的观看效果,尤其是遇到下跌行情时,K线从主图的左上角开始,到主图的右下角结束,那么这个时候你无论如何也看不到左上角的K线,因为它们被信息板遮住了。怎么办?隐藏信息板是个好办法。

2. 实现方法

修改vnpy\chart\widget.py中的class ChartCursor,代码如下:

class ChartCursor(QtCore.QObject):
    """"""

    def __init__(
        self,
        widget: ChartWidget,
        manager: BarManager,
        plots: Dict[str, pg.GraphicsObject],
        item_plot_map: Dict[ChartItem, pg.GraphicsObject]
    ) -> None:
        """"""
        super().__init__()

        self._widget: ChartWidget = widget
        self._manager: BarManager = manager
        self._plots: Dict[str, pg.GraphicsObject] = plots
        self._item_plot_map: Dict[ChartItem, pg.GraphicsObject] = item_plot_map

        self._x: int = 0
        self._y: int = 0
        self._plot_name: str = ""

        self._info_visibles:dict[str,bool] = {} # hxxjava add 2023-2-10

        self._init_ui()
        self._connect_signal()  

    def _init_ui(self) -> None:
        """"""
        self._init_line()
        self._init_label()
        self._init_info()

    def _init_line(self) -> None:
        """
        Create line objects.
        """
        self._v_lines: Dict[str, pg.InfiniteLine] = {}
        self._h_lines: Dict[str, pg.InfiniteLine] = {}
        self._views: Dict[str, pg.ViewBox] = {}

        pen: QtGui.QPen = pg.mkPen(WHITE_COLOR)

        for plot_name, plot in self._plots.items():
            v_line: pg.InfiniteLine = pg.InfiniteLine(angle=90, movable=False, pen=pen)
            h_line: pg.InfiniteLine = pg.InfiniteLine(angle=0, movable=False, pen=pen)
            view: pg.ViewBox = plot.getViewBox()

            for line in [v_line, h_line]:
                line.setZValue(0)
                line.hide()
                view.addItem(line)

            self._v_lines[plot_name] = v_line
            self._h_lines[plot_name] = h_line
            self._views[plot_name] = view

    def _init_label(self) -> None:
        """
        Create label objects on axis.
        """
        self._y_labels: Dict[str, pg.TextItem] = {}
        for plot_name, plot in self._plots.items():
            label: pg.TextItem = pg.TextItem(
                plot_name, fill=CURSOR_COLOR, color=BLACK_COLOR)
            label.hide()
            label.setZValue(2)
            label.setFont(NORMAL_FONT)
            plot.addItem(label, ignoreBounds=True)
            self._y_labels[plot_name] = label

        self._x_label: pg.TextItem = pg.TextItem(
            "datetime", fill=CURSOR_COLOR, color=BLACK_COLOR)
        self._x_label.hide()
        self._x_label.setZValue(2)
        self._x_label.setFont(NORMAL_FONT)
        plot.addItem(self._x_label, ignoreBounds=True)

    def _init_info(self) -> None:
        """
        """
        self._infos: Dict[str, pg.TextItem] = {}
        for plot_name, plot in self._plots.items():
            info: pg.TextItem = pg.TextItem(
                "info",
                color=CURSOR_COLOR,
                border=CURSOR_COLOR,
                fill=BLACK_COLOR
            )
            info.hide()
            info.setZValue(2)
            info.setFont(NORMAL_FONT)
            plot.addItem(info)  # , ignoreBounds=True)
            self._infos[plot_name] = info

    def _connect_signal(self) -> None:
        """
        Connect mouse move signal to update function.
        """
        self._widget.scene().sigMouseMoved.connect(self._mouse_moved)
        self._widget.scene().sigMouseClicked.connect(self._mouse_clicked) # hxxjava add 2023-2-10

    def isInfoVisible(self,plot_name:str):
        """ 获取信息板的隐显 hxxjava add 2023-2-10 """
        if plot_name not in self._info_visibles:
            self._info_visibles[plot_name] = True
        return self._info_visibles[plot_name]

    def setInfoVisible(self,plot_name:str,visible:bool):
        """ 设置信息板隐显 hxxjava add 2023-2-10 """
        self._info_visibles[plot_name] = visible    

    def _mouse_clicked(self,evt): # hxxjava add 2023-2-10
        """ 用鼠标左键+CTRL键隐显信息板 2023-2-10 """
        button = evt.button()
        modifiers = evt.modifiers()
        if button == QtCore.Qt.LeftButton and modifiers == QtCore.Qt.ControlModifier:
            if self._plot_name in self._infos:
                text_info = self._infos.get(self._plot_name,None)  
                old_value = self.isInfoVisible(self._plot_name)    
                self.setInfoVisible(self._plot_name,not old_value)      
                text_info.setVisible(not old_value)

    def _mouse_moved(self, evt: tuple) -> None:
        """
        Callback function when mouse is moved.
        """
        if not self._manager.get_count():
            return

        # First get current mouse point
        pos: tuple = evt

        for plot_name, view in self._views.items():
            rect = view.sceneBoundingRect()

            if rect.contains(pos):
                mouse_point = view.mapSceneToView(pos)
                self._x = to_int(mouse_point.x())
                self._y = mouse_point.y()
                self._plot_name = plot_name
                break

        # Then update cursor component
        self._update_line()
        self._update_label()
        self.update_info()

    def _update_line(self) -> None:
        """"""
        for v_line in self._v_lines.values():
            v_line.setPos(self._x)
            v_line.show()

        for plot_name, h_line in self._h_lines.items():
            if plot_name == self._plot_name:
                h_line.setPos(self._y)
                h_line.show()
            else:
                h_line.hide()

    def _update_label(self) -> None:
        """"""
        bottom_plot: pg.PlotItem = list(self._plots.values())[-1]
        axis_width = bottom_plot.getAxis("right").width()
        axis_height = bottom_plot.getAxis("bottom").height()
        axis_offset: QtCore.QPointF = QtCore.QPointF(axis_width, axis_height)

        bottom_view: pg.ViewBox = list(self._views.values())[-1]
        bottom_right = bottom_view.mapSceneToView(
            bottom_view.sceneBoundingRect().bottomRight() - axis_offset
        )

        for plot_name, label in self._y_labels.items():
            if plot_name == self._plot_name:
                label.setText(str(self._y))
                label.show()
                label.setPos(bottom_right.x(), self._y)
            else:
                label.hide()

        dt: datetime = self._manager.get_datetime(self._x)
        if dt:
            self._x_label.setText(dt.strftime("%Y-%m-%d %H:%M:%S"))
            self._x_label.show()
            self._x_label.setPos(self._x, bottom_right.y())
            self._x_label.setAnchor((0, 0))

    def update_info(self) -> None:
        """"""
        buf: dict = {}

        for item, plot in self._item_plot_map.items():
            item_info_text: str = item.get_info_text(self._x)

            if plot not in buf:
                buf[plot] = item_info_text
            else:
                if item_info_text:
                    buf[plot] += ("\n\n" + item_info_text)

        for plot_name, plot in self._plots.items():
            plot_info_text: str = buf[plot]
            info: pg.TextItem = self._infos[plot_name]
            info.setText(plot_info_text)
            if self.isInfoVisible(plot_name):    # hxxjava add 2023-2-10
                info.show()

            view: pg.ViewBox = self._views[plot_name]
            top_left = view.mapSceneToView(view.sceneBoundingRect().topLeft())
            info.setPos(top_left)

    def move_right(self) -> None:
        """
        Move cursor index to right by 1.
        """
        if self._x == self._manager.get_count() - 1:
            return
        self._x += 1

        self._update_after_move()

    def move_left(self) -> None:
        """
        Move cursor index to left by 1.
        """
        if self._x == 0:
            return
        self._x -= 1

        self._update_after_move()

    def _update_after_move(self) -> None:
        """
        Update cursor after moved by left/right.
        """
        bar: BarData = self._manager.get_bar(self._x)
        self._y = bar.close_price

        self._update_line()
        self._update_label()

    def clear_all(self) -> None:
        """
        Clear all data.
        """
        self._x = 0
        self._y = 0
        self._plot_name = ""

        for line in list(self._v_lines.values()) + list(self._h_lines.values()):
            line.hide()

        for label in list(self._y_labels.values()) + [self._x_label]:
            label.hide()

3. 运行效果

3.1 未隐藏信息板图表

description

3.2 隐藏信息板图表

description

3.3 注意事项

  • 使用中使用CTRL键+鼠标左键单击实现信息板的隐藏与显示;
  • 可以单独隐藏与显示单个主图/副图的信息板,不会相互影响。

比较完善的方法是:

先查询合约信息,其中有最小交易量(这已经有)和最大交易量(vnpy中目前没有设计),但是CTP接口中有,然后拆单交易:

CTP接口返回到合约信息包括:

struct CThostFtdcInstrumentField
{
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///合约名称
    TThostFtdcInstrumentNameType InstrumentName;
    ///合约在交易所的代码
    TThostFtdcExchangeInstIDType ExchangeInstID;
    ///产品代码
    TThostFtdcInstrumentIDType ProductID;
    ///产品类型
    TThostFtdcProductClassType ProductClass;
    ///交割年份
    TThostFtdcYearType DeliveryYear;
    ///交割月
    TThostFtdcMonthType DeliveryMonth;
    ///市价单最大下单量
    TThostFtdcVolumeType MaxMarketOrderVolume;
    ///市价单最小下单量
    TThostFtdcVolumeType MinMarketOrderVolume;
    ///限价单最大下单量
    TThostFtdcVolumeType MaxLimitOrderVolume;
    ///限价单最小下单量
    TThostFtdcVolumeType MinLimitOrderVolume;
    ///合约数量乘数
    TThostFtdcVolumeMultipleType VolumeMultiple;
    ///最小变动价位
    TThostFtdcPriceType PriceTick;
    ///创建日
    TThostFtdcDateType CreateDate;
    ///上市日
    TThostFtdcDateType OpenDate;
    ///到期日
    TThostFtdcDateType ExpireDate;
    ///开始交割日
    TThostFtdcDateType StartDelivDate;
    ///结束交割日
    TThostFtdcDateType EndDelivDate;
    ///合约生命周期状态
    TThostFtdcInstLifePhaseType InstLifePhase;
    ///当前是否交易
    TThostFtdcBoolType IsTrading;
    ///持仓类型
    TThostFtdcPositionTypeType PositionType;
    ///持仓日期类型
    TThostFtdcPositionDateTypeType PositionDateType;
    ///多头保证金率
    TThostFtdcRatioType LongMarginRatio;
    ///空头保证金率
    TThostFtdcRatioType ShortMarginRatio;
    ///是否使用大额单边保证金算法
    TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;
    ///基础商品代码
    TThostFtdcInstrumentIDType UnderlyingInstrID;
    ///执行价
    TThostFtdcPriceType StrikePrice;
    ///期权类型
    TThostFtdcOptionsTypeType OptionsType;
    ///合约基础商品乘数
    TThostFtdcUnderlyingMultipleType UnderlyingMultiple;
    ///组合类型
    TThostFtdcCombinationTypeType CombinationType;
};

2. vnpy中的合约信息ContractData,没有最大交易手数:

@dataclass
class ContractData(BaseData):
    """
    Contract data contains basic information about each contract traded.
    """

    symbol: str
    exchange: Exchange
    name: str
    product: Product
    size: float
    pricetick: float

    min_volume: float = 1           # minimum trading volume of the contract —— 这个就是最小交易手数
    stop_supported: bool = False    # whether server supports stop order
    net_position: bool = False      # whether gateway uses net position volume
    history_data: bool = False      # whether gateway provides bar history data

    option_strike: float = 0
    option_underlying: str = ""     # vt_symbol of underlying contract
    option_type: OptionType = None
    option_listed: datetime = None
    option_expiry: datetime = None
    option_portfolio: str = ""
    option_index: str = ""          # for identifying options with same strike price

3. 目前CTA策略中还没有大单拆分设计

需要自行设计,因连续的两条交易指令send_order()之间必须有时间间隔,否则前一条会被覆盖,交易所只会收收到后一条。
也就是说,你必须设计一个独立的大单委托管理机制,可以设计类似算法交易的vwap、iceberg这类的东西来支持你的交易策略。

老秦 wrote:

非常感谢。那么对这种涨停停牌的合约,其状态是 '1' NO_TRADING = "非交易" 吗?

如果盘中收到,是因为涨停停牌或者跌停停牌通知,过几分钟再次通知重新交易的。

答复:

  1. 其实vnpy系统中自带的本地停止单和我的条件单一样都是运行在内存中的,只要一关机就没有了。
  2. 如果希望再次启动系统,运行策略,还能再次调出停止单和条件单,需要比较复杂的设计。因为要启动旧的条件单,就必须再次初始化、启动其所关联的策略,而这个时间是不受条件单控制的。
  3. 要实现这样的要求也是可以的,需要比较周全的考虑,等我有时间了再设计一个可以缓存、重新加载和鉴别条件单是否有效的恢复条件单功能,当然此功能也对本地停止单有效。

这个很正常。当一个品种的合约的状态信息都一致的时候,就只播发品种的状态信息,如果个别的状态与品种不一致的时候,就在播发具体合约的状态信息。
例如一个合约发生了涨停板或者跌停板则会休市一定时间,此时该品种的其他合约是正常的,那么此时您就会只收到一个合约的状态变化信息。
所以,要找一个合约的状态信息,首先匹配合约的vt_symbol,找不到的时候在找品种的vt_symbol。

少林寺猫猫 wrote:

jerrychen wrote:

我個人是寫一個watch_dog的app。只要timeout就把所有gateway重啟

能否分享一下这个功能,因为ctp经常隔夜断联,需要重新连接!你的重启是在引擎里重启?

方法有二:

  1. 可以通过一楼帖子中MdAPI和TdAPI的onFrontConnected()和 onFrontDisconnected()感知连接的通断,在CtpGateway内部实现断开后的自动重连
  2. 可以用脚本定时启动整个系统和策略,再定时关闭策略和系统,只在交易时段才启动和连接网关,这样也是比较可靠的做法。

jerrychen wrote:

VNPY真的太陽春了。而且內定綁死RQDATA。對於國外用戶根本是個超大障礙!
我卡在RQDATA這邊卡好久

vnpy可以提供多种DataFeed的选择,例如:

老秦 wrote:

合约中有两个字段: 多头保证金率 "LongMarginRatioByMoney", 空头保证金率 ="ShortMarginRatioByMoney"。可是交易所网站通常发布的是一个保证金率,是不是意味着这两个保证金率取值相同?有见过不同的吗?

通常是一样的,但也应该分别计算,不可存在侥幸心理。

你说的对,检查了一下,确实是遗漏了对class CtaManager修改的代码。
我已经修改了一楼的帖子,你查看下吧。

少林寺猫猫 wrote:

hxxjava wrote:

少林寺猫猫 wrote:

老师您好,实践上UI会缺少了右边的条件管理器窗口。是不是class CtaManager(QtWidgets.QWidget):部分的修改代码没贴上来的原因哦?

不是的,相关的代码在StrategyManager中,一楼的帖子中有。

实在抱歉,老师。我看了很久StrategyManager的代码,也测试过几次,都只是显示左侧的条件单管理器内容,而没有右侧的条件单管理器内容。我开始尝试在class CtaManager中参照停止单的写法,直接调用了ConditionOrderMonitor类,就显示了右侧的条件单管理器内容,但是部分功能会有异常报错。如双击撤单有问题等。

CTA策略管理器的右侧共分三栏:停止单列表,条件单列表和 策略运行日志列表。
怎么会有左侧的呢?你截图看看。

少林寺猫猫 wrote:

老师您好,实践上UI会缺少了右边的条件管理器窗口。是不是class CtaManager(QtWidgets.QWidget):部分的修改代码没贴上来的原因哦?

不是的,相关的代码在StrategyManager中,一楼的帖子中有。

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】