ranjianlin wrote:
为什么不在停止单的基础上来修改了?增加执行价格,设定价、市场价或者极限价呢? 这样会不会简单一点。
答复:
您细细比较停止单和我的条件单的区别就知道为什么了。
- 在没有条件单的情况下,想买的价格比盘口价低会等待,可以使用停止单来实现;想想买的价格比盘口价高不会等待,立即执行。
- 在有条件单的情况下,想买的价格比盘口价低会等待,可以使用停止单来实现;想想买的价格比盘口价高也会等待。
哈哈哈哈baab695eb19449d0 wrote:
hxxjava老哥你好,
最近面临代码管理的问题:
直接修改源代码怕不能更新;
将文件复制到出来额外编写会有导包比较乱,以及编辑器无法定位代码的问题;
重写覆盖会有多文件引用需要全部重命名的问题.想向你请教一下,你的系统改动这么多,代码是怎么管理的?
用git吧。
zeal的官网网址:https://zealdocs.org/
Windows,Linux和BSD各种平台,选择属于自己需要的平台。我选择的是免安装的Portable的压缩包:
windows平台的Portable压缩包下载之后无需安装,所以设置和文档都存放在应用目录下,比较方便。下载之后直接解压到指定目录下,找到运行其中zeal.exe就可以了。
浏览离线文档既可以在zeal内部浏览器显示感兴趣的主题,也可以选择外边浏览器。至此你再也不用为查询python语法、函数解释和用法而烦恼了。
黄sir wrote:
大佬请教两个问题:
1、是不是开市的时候,服务器才会返回相应的值
2、向远程服务器询问保证金和手续费的时候,不是每次询问都会有返回的,需要多次调用对吗?
答复:
只要交易服务器登录成功,每次询问保证金和手续费都会有返回的,无需在交易时段。
但只能够单个品种的查询,可能返回多个结果(单个品种的或者品种+单个合约的)。
而且在交易时段查询,询问保证金和手续费会占用上行流控,如果查询太多太密集,
可能影响委托申请的响应,因为上行流控每秒最多6次。
vnpy中多采用各种应用系统的策略进行交易的,虽然也有各种日志和提示出现,但平常总是静悄悄的。
如果你想了解系统和策略的运行情况,可以查看各种运行日志,例如MainWindow的日志,委托列表,成交的列表,账户列表。想要查询你的策略运行情况,可以查看你的策略管理器的变量输出,等等。可是人总不能一直盯着屏幕,那样太累了。
如果能够有个声音和语音播报各种交易活动,用户会及时得到提醒。例如:
在vnpy\usertools目录下添加文件sound_player.py,内容如下:
"""
多线程音乐和文本播放器,介绍如下:
特点:
既可以播放wav,mp3 等格式,有可以对文本进行播放。
使用消息引擎创建该音乐播放器,多线程并行播放声音,不会因为播放声音而阻塞业务流程。
假设其他应用或者策略中需要播放声音, sound_name为字符型的声音文件名称,使用方法有两种:
方法1: 先获取消息引擎event_engine(注:与SoundPlayer实例成交时使用的消息引擎是
相同的),那么可以这样播放:
event_engine.put(Event(EVENT_SOUND,"Connected.wav"))
event_engine.put(Event(EVENT_SPEEK,"您收到一条委托单"))
方法2: 将SoundPlayer多play_sound()接口安装到MainEngine到实例main_engine,那么
可以先回去获取main_engine,然后这样播放:
event_engine.play_sound("Connected.wav")
event_engine.speek_text("您收到一条委托单")
作者:hxxjava 时间:2023-2-14,情人节————献给心爱的人!
修改:增加回测与实盘的区分功能,使得只在实盘环境才播放声音和文本。
修改:hxxjava 时间:2023-2-28
依赖库:pyttsx3, 安装:pip install pyttsx3
"""
from typing import Any
from pathlib import Path
from threading import Thread
from vnpy.trader.engine import EventEngine,Event
from winsound import PlaySound,SND_FILENAME
import pyttsx3
EVENT_SOUND = "eSound."
EVENT_SPEEK = "eSpeak."
class SoundPlayer():
"""
多线程声音播放器
"""
def __new__(cls, *args, **kwargs):
""" singleton constructor """
if not hasattr(cls, "_instance"):
cls._instance = super(SoundPlayer, cls).__new__(cls)
return cls._instance
def __init__(self,event_engine:EventEngine,switch:bool=True):
""" 初始化函数 """
self.event_engine = event_engine
# control play sound file
self.switch = switch
self.register_event()
def register_event(self):
""" """
self.event_engine.register(EVENT_SOUND,self.process_sound_event)
self.event_engine.register(EVENT_SPEEK,self.process_speak_event)
def set_switch(self,switch:bool=True):
""" set the swith which control play sound file """
self.switch = switch
def _get_sound_path(self,sound_name: str):
"""
Get path for sound file with sound name.
"""
this_file_path:Path = Path(__file__).parent
sound_path:Path = this_file_path.joinpath("sounds", sound_name)
return str(sound_path)
def process_sound_event(self,event:Event):
""" EVENT消息处理过程 """
wavname,is_testing = event.data['wavname'],event.data['is_testing']
if self.switch == True and is_testing == False:
filename = self._get_sound_path(wavname)
thread = Thread(target=self._play_sound,kwargs=({"filename":filename}),daemon=True)
thread.start()
def process_speak_event(self,event:Event):
""" EVENT消息处理过程 """
santence,is_testing = event.data['santence'],event.data['is_testing']
if self.switch == True and is_testing == False:
santence:str = event.data
thread = Thread(target=self._do_speak,kwargs=({"santence":santence}),daemon=True)
thread.start()
def _play_sound(self,filename:str):
""" 音乐文件播放线程执行过程 """
PlaySound(filename,SND_FILENAME)
def _do_speak(self,santence:str):
""" 文本播放线程执行过程 """
print(santence)
speaker = pyttsx3.init()
speaker.say(santence)
speaker.runAndWait()
def play_sound(self,sound_name:str,is_testing:bool=False):
"""
用户音乐播放接口。
参数:
sound_name:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SOUND,{"wavname":sound_name,"is_testing":is_testing}))
def speak_text(self,santence:str,is_testing:bool=False):
"""
用户文字播放接口。
参数:
santence:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SPEEK,{"santence":santence,"is_testing":is_testing}))
在vnpy\trader\engine.py中做如下修改:
from vnpy.usertools.sound_player import SoundPlayer
self.sound_player = SoundPlayer(event_engine,True) # test sound player
def add_function(self) -> None:
"""Add query function to main engine."""
... ...
self.main_engine.play_sound = self.sound_player.play_sound
self.main_engine.speak_text = self.sound_player.speak_text
这样你的MainEngine就有了可以音乐和语音功能了。
class sound_player规定音频文件存放在vnpy\usertools\sounds\目录下,当然你也可以修改代码中规定的目录,放在自己喜欢的目录下。
文件可以是wav、mp3格式的音乐文件均可,可以自己录制。
取一些有意义的文件名,如connected.wav代表网络连接成功,disconnection.wav代表网络断开,自己发挥吧,方便自己在自己vnpy系统中用函数调用。
本来本人有一套音乐文件的,可是论坛里没有文件上传功能,所以无法共享给大家,如果需要可以私信我。
下面用连接网关成功和连接断开,分别给出音乐和语音播放的示例:
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("Connected.wav")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("ConnectionLost.wav")
# 增加引用
from vnpy.usertools.sound_player import EVENT_SOUND,EVENT_SPEEK
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
# 当策略收到委托单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND,
{"wavname":"order.wav","is_testing":is_testing}
)
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
# 当策略收到成交单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND, {"wavname":"traded.wav","is_testing":is_testing} )
event_engine.put(event)
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"感谢您,连接{gateway.name}的{gateway.type}接口成功!")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"请注意:{gateway.name}的{gateway.type}接口已断开!")
假设你的策略中实现了on_order()和on_trade()这两个回调函数:
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到委托单,价格{order.price},手数{order.volume},已经成交{order.traded}",
"is_testing":is_testing})
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到{trade.vt_symbol}成交单,成交价{trade.price}手数{trade.volume}.",
"is_testing":is_testing})
event_engine.put(event)
实盘中用户策略是可以通过应用应用引擎获得vnpy系统的MainEngine的,这样就可以使用 play_sound() 和 speak_text()函数来播放音乐和语音了。但是,在策略中使用这个两个播放函数,应该考虑到回测时不要有声音的。应该根据应用引擎的不同,在策略中使用 play_sound() 和 speak_text()时,将参数is_testing设置为True,这样策略回测就不会有音乐和语音了。
这里以CTA策略模板CtaTemplate为例,演示如何将音乐和语音播放功能封装到各种应用的策略中,其他应用系统的模板可以参考以下的做法去封装,就不再一一讲解。
# 在引用部分增加对音乐和语音播器的引用
from vnpy.usertools.sound_player import SoundPlayer
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
... ... # 原来的初始化代码
# 音乐和语音播放器 hxxjava add
self.sound_player:SoundPlayer = SoundPlayer(self.cta_engine.event_engine)
def play_sound(self,sound_name:str):
""" 播放音乐 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.play_sound(sound_name)
def speak_text(self,santence:str):
""" 播放语音 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.speak_text(santence)
经过上面对CtaTemplate的修改,用户策略中就可以像下面的语句一样直接调用音乐和语音播放了,更加简便。
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化开始")
self.load_bar(10)
self.speak_text(f"策略{self.strategy_name}开始初始化")
fy758496805 wrote:
大佬
在过程中 出现了一个错误File "E:\VNPY\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 276, in open_widget
2023-02-23 09:33:34 widget = widget_class(self.main_engine, self.event_engine)
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 100, in init
2023-02-23 09:33:35 self.init_ui()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 259, in init_ui
self.candle_dialog: CandleChartDialog = CandleChartDialog()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 1257, in init
self.init_ui()
File "E:\VNPY\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 1270, in init_ui
self.trade_item: TradeItem = TradeItem(plot=candle_plot, manager=manager)
File "E:\VNPY\lib\site-packages\vnpy\usertools\chart_items.py", line 1041, in init
super().init(plot=plot, manager=manager, size=15, pxMode=True, pen=pg.mkPen(None),
File "E:\VNPY\lib\site-packages\vnpy\usertools\chart_items.py", line 980, in init
self.plot.addItem(self)
AttributeError: 'NoneType' object has no attribute 'addItem'是不是继承类的时候有问题呀?
你好像在策略中用了K线图表了,但是需要区分是用在实盘还是回测环境,回测时显示ChartWidget给谁看呀?
把你的策略这样写:
def __init__(self, ... ...): # 你的策略的初始化函数
... ...
self.chart:CandleChartDialog = None
... ...
def on_start(self): # 策略的启动函数
is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
if is_live and self.show_chart and not self.chart:
# 实盘,配置了显示图表同时未成交的情况下,创建图表——测试环境不会创建图表
self.chart = CandleChartDialog(
strategy = self,
... ... # 你的参数
)
self.chart.update_dir_history(self.bars)
self.chart.show()
cabinet wrote:
请问大神,能否告知下,使用sqlite数据库如何修改代码呢,万分感谢!
都是用peewee写的,代码我都已经分享了,只需要换下数据源,一个是MySQL另外一个是SQLite而已。
如果不懂peewee可以网上搜下,先找例子在jupyter notebook中练练手,明白了才好修改vnpy_sqlite下的数据库。
不同的数据库提供的接口都是一样的,都是实现vnpy.trader.database里规定的接口。
CTA策略模块原来是有停止单的,本人后来又在添加了条件单功能。使用中很多用户反应有这样的问题,那就是策略已经发出过停止单或条件单,但是还未触发,但是因为某种原因策略被关机了,再次启动该策略时发现之前发出过停止单或条件单没有了,非常不方便。如果能够在策略再次启动的时候,把历史的停止单或条件单回复出来就好了。
那么如何把历史的停止单或条件单回复出来呢?把策略运行时曾经发出的停止单或条件单保存到文件或者数据库,在策略再次启动时,从文件或者数据库读取出来,恢复到CTA策略管理器的停止单或条件单列表,让它们继续运行就可以了。
这就有选择的问题:
包括如下:
在vnpy_ctastrategy命令下新建一个文件utitlity.py,其内容如下:
"""
实现停止单字典和条件单字典的存取功能,功能如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
作者:hxxjava
时间:2023-2-12
"""
import json
from datetime import datetime
from vnpy.trader.constant import Direction,Offset
from vnpy_ctastrategy.base import (
StopOrder,
StopOrderStatus,
ConditionOrder,
Condition,
ExecutePrice,
CondOrderStatus,
)
from vnpy.trader.utility import get_file_path,get_folder_path,save_json
class StopOrderEncoder(json.JSONEncoder):
"""
停止单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,StopOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,StopOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class StopOrderDecoder(json.JSONDecoder):
"""
停止单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'StopOrderStatus':
value = d['_value_']
instance = StopOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'StopOrder':
instance = StopOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
class CondOrderEncoder(json.JSONEncoder):
"""
条件单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,Condition):
d['_value_'] = obj.value
elif isinstance(obj,ExecutePrice):
d['_value_'] = obj.value
elif isinstance(obj,CondOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,ConditionOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class CondOrderDecoder(json.JSONDecoder):
"""
条件单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'Condition':
value = d['_value_']
instance = Condition(value)
elif class_name == 'ExecutePrice':
value = d['_value_']
instance = ExecutePrice(value)
elif class_name == 'CondOrderStatus':
value = d['_value_']
instance = CondOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'ConditionOrder':
instance = ConditionOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
def save_stop_order(filename: str,data: dict) -> None:
"""
Save StopOrder dict into {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=StopOrderEncoder,
ensure_ascii=False
)
def load_stop_order(filename: str) -> dict:
"""
Load StopOrder dict from {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=StopOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
def save_condition_order(filename: str,data: dict) -> None:
"""
Save ConditionOrder dict into {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=CondOrderEncoder,
ensure_ascii=False
)
def load_condition_order(filename: str) -> dict:
"""
Load ConditionOrder dict from {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=CondOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
在文件cta_strategy\engine.py的class MyCtaEngine下面增加下面的代码。class MyCtaEngine已经在比停止单更好用的条件单——ConditionOrder一文中分享给大家来,虽然这次贴出其完整代码,但这里只介绍与停止单和条件单的保存与恢复有关的内容。
在cta_strategy\engine.py的引用部分增加这些代码:
from .utility import save_stop_order, load_stop_order, save_condition_order, load_condition_order # hxxjava add
class MyCtaEngine下面增加下面的代码:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
4. 定时保存已经初始化策略的停止单和条件单到json文件。
5. 提供历史策略的停止单和条件单的查询接口。
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
self.seconds = 0
self.save_orders_interval = 10
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
self.event_engine.register(EVENT_ALL_PENDING_ORDER, self.process_pending_order_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_pending_order_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
pending_orders:PendingOrders = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(pending_orders.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行全挂单消息推送
self.call_strategy_func(strategy, strategy.on_pending_orders, pending_orders)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(tick.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.put_cond_order_event(order)
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def put_cond_order_event(self, cond_order: ConditionOrder) -> None:
"""
Put an event to update condition order status.
"""
event: Event = Event(EVENT_CONDITION_ORDER, cond_order)
self.event_engine.put(event)
def save_stop_orders(self,strategy_name:str,active_only:bool=False):
""" 保存加载历史停止单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
stop_orders = {}
for order_id,stop_order in self.stop_orders.items():
if stop_order.strategy_name == strategy_name:
if active_only and stop_order.status == StopOrderStatus.WAITING:
stop_orders[order_id] = stop_order
count += 1
else:
stop_orders[order_id] = stop_order
count += 1
file_name = f"{strategy_name}.json"
save_stop_order(file_name,stop_orders)
return count
def load_stop_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史停止单 """
file_name = f"{strategy_name}.json"
stop_orders:Dict[str,StopOrder] = load_stop_order(file_name)
if not stop_orders:
return
if not active_only:
loaded_orders = stop_orders
else:
loaded_orders = {}
for id,stop_order in stop_orders.items():
if stop_order.status == StopOrderStatus.WAITING:
loaded_orders[id] = stop_order
if loaded_orders:
self.stop_orders.update(loaded_orders)
# 更新GUI中加载的停止单列表
for stop_order in loaded_orders.values():
self.put_stop_order_event(stop_order)
def save_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 保存加载历史条件单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
cond_orders = {}
for order_id,cond_order in self.condition_orders.items():
if cond_order.strategy_name == strategy_name:
if active_only and cond_order.status == CondOrderStatus.WAITING:
cond_orders[order_id] = cond_order
count += 1
else:
cond_orders[order_id] = cond_order
count += 1
file_name = f"{strategy_name}.json"
save_condition_order(file_name,cond_orders)
return count
def load_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史条件单 """
file_name = f"{strategy_name}.json"
cond_orders:Dict[str,ConditionOrder] = load_condition_order(file_name)
if not cond_orders:
return
if not active_only:
loaded_orders = cond_orders
else:
loaded_orders = {}
for id,cond_order in cond_orders.items():
if cond_order.status == CondOrderStatus.WAITING:
loaded_orders[id] = cond_order
if loaded_orders:
self.condition_orders.update(loaded_orders)
# 更新GUI中加载的条件单列表
for cond_order in loaded_orders.values():
self.put_cond_order_event(cond_order)
def process_timer_event(self,event:Event) -> None:
# 定时保存策略的
self.seconds += 1
if self.seconds % self.save_orders_interval:
return
if self.get_engine_type() != EngineType.LIVE:
# 只有实盘引擎才保存停止单和条件单,回测引擎则不保存
return
for strategy in self.strategies.values():
if strategy.inited:
cnt1 = self.save_stop_orders(strategy.strategy_name)
cnt2 = self.save_condition_orders(strategy.strategy_name)
# print(f"保存了策略 {strategy.strategy_name} 的 {cnt1} 个停止单,{cnt2} 个条件。")
这里主要介绍在CtaTemplate中增加点与加载历史停止单和条件单相关的成员变量:
其中:
class CtaTemplate的其他代码见本人之前的帖子中的代码:比停止单更好用的条件单——ConditionOrder。
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# 是否在启动之后加载历史停止单和条件单
self.history_order:bool = False # hxxjava add
self.active_only:bool = False # hxxjava add
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
# 其他代码省略 ... ...
class DemoStrategy(CtaTemplate):
""" 示例策略 """
author = "hxxjava"
capital : float = 200000.0 # 交易资金
max_loss_ratio : int = 6 # 每次开仓最大亏损比例
max_open_times : int = 3 # 每次开仓最大亏损比例
dir_interval : str = '1m' # 方向周期单位,只能够是:'1m','1h','d'或'w'中的一个
dir_window : int = 30 # 方向周期窗口
op_interval : str = '1m' # 操作周期单位,只能够是:'1m','1h','d'或'w'中的一个
op_window : int = 3 # 操作周期窗口
load_days : int = 10 # 加载历史行情的天数
OpenSelect:str = "逆转价"
show_chart: bool = True # 是否需要显示图表
dir_trend: str = ""
op_trend: str = ""
parameters = [
"capital",
"max_loss_ratio",
"max_open_times",
"dir_interval",
"dir_window",
"op_interval",
"op_window",
"load_days",
"OpenSelect",
"show_chart",
"history_order", # 启动时是否加载停止单和条件单选项
"active_only", # 是否只加载仍然有效的停止单和条件单选项,
]
long_pos:float = 0 # 持有多仓
short_pos:float = 0 # 持有空仓
variables = [
"dir_trend",
"op_trend",
"long_pos",
"short_pos"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
self.history_order = True # 启动时载历史停止单和条件单
self.active_only = False # 加载又有点历史停止单和条件单
# 其他的代码省略
def on_start(self):
"""
Callback when strategy is started.
"""
is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
if is_live and self.history_order:
# 只有实盘才保存策略的历史停止单和条件单
from vnpy_ctastrategy.engine import MyCtaEngine
cta_engine:MyCtaEngine = self.cta_engine
cta_engine.load_stop_orders(self.strategy_name,self.active_only)
cta_engine.load_condition_orders(self.strategy_name,self.active_only)
self.write_log("加载历史停止单和条件单已执行。")
# 其他的代码省略
在用户策略未启动的情况下,还可以设置有关加载停止单和条件单委托的选项,如图所示:
下图是策略执行了条件单之后被关闭,再次重新启动之后回复的条件单。当然,停止单也是可以的实现恢复历史的,大家可以去试。
以条件单为例,如下图所示,条件单通常保存在用户目录下的.vntrader\cond_orders{策略名称}.json文件中:
{
"0213082700975": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "多"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4072.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": ">"
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "设定价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 08:27:00.975422"
},
"trigger_time": null,
"cond_orderid": "0213082700975",
"traded": 0.0,
"vt_orderids": [],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已撤销"
},
"before_trigger": null,
"after_traded": null
},
"0213090325941": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "空"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4063.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": "<="
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "极限价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.941102"
},
"trigger_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.500000"
},
"cond_orderid": "0213090325941",
"traded": 0.0,
"vt_orderids": [
"CTP.12_-2076558284_1"
],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已触发"
},
"before_trigger": null,
"after_traded": null
}
}
K线图表中主图与副图都有信息板,当我们用选择某个K线时,它用文字的形式表达每个K线修改的信息以及指标数值,它很有用。但是有时因为主图或者副图的指标过多,抑或是显示的信息过多,它会覆盖主图或副图的左上角或者右上角开始的很大一片区域,很影响我们的观看效果,尤其是遇到下跌行情时,K线从主图的左上角开始,到主图的右下角结束,那么这个时候你无论如何也看不到左上角的K线,因为它们被信息板遮住了。怎么办?隐藏信息板是个好办法。
修改vnpy\chart\widget.py中的class ChartCursor,代码如下:
class ChartCursor(QtCore.QObject):
""""""
def __init__(
self,
widget: ChartWidget,
manager: BarManager,
plots: Dict[str, pg.GraphicsObject],
item_plot_map: Dict[ChartItem, pg.GraphicsObject]
) -> None:
""""""
super().__init__()
self._widget: ChartWidget = widget
self._manager: BarManager = manager
self._plots: Dict[str, pg.GraphicsObject] = plots
self._item_plot_map: Dict[ChartItem, pg.GraphicsObject] = item_plot_map
self._x: int = 0
self._y: int = 0
self._plot_name: str = ""
self._info_visibles:dict[str,bool] = {} # hxxjava add 2023-2-10
self._init_ui()
self._connect_signal()
def _init_ui(self) -> None:
""""""
self._init_line()
self._init_label()
self._init_info()
def _init_line(self) -> None:
"""
Create line objects.
"""
self._v_lines: Dict[str, pg.InfiniteLine] = {}
self._h_lines: Dict[str, pg.InfiniteLine] = {}
self._views: Dict[str, pg.ViewBox] = {}
pen: QtGui.QPen = pg.mkPen(WHITE_COLOR)
for plot_name, plot in self._plots.items():
v_line: pg.InfiniteLine = pg.InfiniteLine(angle=90, movable=False, pen=pen)
h_line: pg.InfiniteLine = pg.InfiniteLine(angle=0, movable=False, pen=pen)
view: pg.ViewBox = plot.getViewBox()
for line in [v_line, h_line]:
line.setZValue(0)
line.hide()
view.addItem(line)
self._v_lines[plot_name] = v_line
self._h_lines[plot_name] = h_line
self._views[plot_name] = view
def _init_label(self) -> None:
"""
Create label objects on axis.
"""
self._y_labels: Dict[str, pg.TextItem] = {}
for plot_name, plot in self._plots.items():
label: pg.TextItem = pg.TextItem(
plot_name, fill=CURSOR_COLOR, color=BLACK_COLOR)
label.hide()
label.setZValue(2)
label.setFont(NORMAL_FONT)
plot.addItem(label, ignoreBounds=True)
self._y_labels[plot_name] = label
self._x_label: pg.TextItem = pg.TextItem(
"datetime", fill=CURSOR_COLOR, color=BLACK_COLOR)
self._x_label.hide()
self._x_label.setZValue(2)
self._x_label.setFont(NORMAL_FONT)
plot.addItem(self._x_label, ignoreBounds=True)
def _init_info(self) -> None:
"""
"""
self._infos: Dict[str, pg.TextItem] = {}
for plot_name, plot in self._plots.items():
info: pg.TextItem = pg.TextItem(
"info",
color=CURSOR_COLOR,
border=CURSOR_COLOR,
fill=BLACK_COLOR
)
info.hide()
info.setZValue(2)
info.setFont(NORMAL_FONT)
plot.addItem(info) # , ignoreBounds=True)
self._infos[plot_name] = info
def _connect_signal(self) -> None:
"""
Connect mouse move signal to update function.
"""
self._widget.scene().sigMouseMoved.connect(self._mouse_moved)
self._widget.scene().sigMouseClicked.connect(self._mouse_clicked) # hxxjava add 2023-2-10
def isInfoVisible(self,plot_name:str):
""" 获取信息板的隐显 hxxjava add 2023-2-10 """
if plot_name not in self._info_visibles:
self._info_visibles[plot_name] = True
return self._info_visibles[plot_name]
def setInfoVisible(self,plot_name:str,visible:bool):
""" 设置信息板隐显 hxxjava add 2023-2-10 """
self._info_visibles[plot_name] = visible
def _mouse_clicked(self,evt): # hxxjava add 2023-2-10
""" 用鼠标左键+CTRL键隐显信息板 2023-2-10 """
button = evt.button()
modifiers = evt.modifiers()
if button == QtCore.Qt.LeftButton and modifiers == QtCore.Qt.ControlModifier:
if self._plot_name in self._infos:
text_info = self._infos.get(self._plot_name,None)
old_value = self.isInfoVisible(self._plot_name)
self.setInfoVisible(self._plot_name,not old_value)
text_info.setVisible(not old_value)
def _mouse_moved(self, evt: tuple) -> None:
"""
Callback function when mouse is moved.
"""
if not self._manager.get_count():
return
# First get current mouse point
pos: tuple = evt
for plot_name, view in self._views.items():
rect = view.sceneBoundingRect()
if rect.contains(pos):
mouse_point = view.mapSceneToView(pos)
self._x = to_int(mouse_point.x())
self._y = mouse_point.y()
self._plot_name = plot_name
break
# Then update cursor component
self._update_line()
self._update_label()
self.update_info()
def _update_line(self) -> None:
""""""
for v_line in self._v_lines.values():
v_line.setPos(self._x)
v_line.show()
for plot_name, h_line in self._h_lines.items():
if plot_name == self._plot_name:
h_line.setPos(self._y)
h_line.show()
else:
h_line.hide()
def _update_label(self) -> None:
""""""
bottom_plot: pg.PlotItem = list(self._plots.values())[-1]
axis_width = bottom_plot.getAxis("right").width()
axis_height = bottom_plot.getAxis("bottom").height()
axis_offset: QtCore.QPointF = QtCore.QPointF(axis_width, axis_height)
bottom_view: pg.ViewBox = list(self._views.values())[-1]
bottom_right = bottom_view.mapSceneToView(
bottom_view.sceneBoundingRect().bottomRight() - axis_offset
)
for plot_name, label in self._y_labels.items():
if plot_name == self._plot_name:
label.setText(str(self._y))
label.show()
label.setPos(bottom_right.x(), self._y)
else:
label.hide()
dt: datetime = self._manager.get_datetime(self._x)
if dt:
self._x_label.setText(dt.strftime("%Y-%m-%d %H:%M:%S"))
self._x_label.show()
self._x_label.setPos(self._x, bottom_right.y())
self._x_label.setAnchor((0, 0))
def update_info(self) -> None:
""""""
buf: dict = {}
for item, plot in self._item_plot_map.items():
item_info_text: str = item.get_info_text(self._x)
if plot not in buf:
buf[plot] = item_info_text
else:
if item_info_text:
buf[plot] += ("\n\n" + item_info_text)
for plot_name, plot in self._plots.items():
plot_info_text: str = buf[plot]
info: pg.TextItem = self._infos[plot_name]
info.setText(plot_info_text)
if self.isInfoVisible(plot_name): # hxxjava add 2023-2-10
info.show()
view: pg.ViewBox = self._views[plot_name]
top_left = view.mapSceneToView(view.sceneBoundingRect().topLeft())
info.setPos(top_left)
def move_right(self) -> None:
"""
Move cursor index to right by 1.
"""
if self._x == self._manager.get_count() - 1:
return
self._x += 1
self._update_after_move()
def move_left(self) -> None:
"""
Move cursor index to left by 1.
"""
if self._x == 0:
return
self._x -= 1
self._update_after_move()
def _update_after_move(self) -> None:
"""
Update cursor after moved by left/right.
"""
bar: BarData = self._manager.get_bar(self._x)
self._y = bar.close_price
self._update_line()
self._update_label()
def clear_all(self) -> None:
"""
Clear all data.
"""
self._x = 0
self._y = 0
self._plot_name = ""
for line in list(self._v_lines.values()) + list(self._h_lines.values()):
line.hide()
for label in list(self._y_labels.values()) + [self._x_label]:
label.hide()
CTP接口返回到合约信息包括:
struct CThostFtdcInstrumentField
{
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///合约名称
TThostFtdcInstrumentNameType InstrumentName;
///合约在交易所的代码
TThostFtdcExchangeInstIDType ExchangeInstID;
///产品代码
TThostFtdcInstrumentIDType ProductID;
///产品类型
TThostFtdcProductClassType ProductClass;
///交割年份
TThostFtdcYearType DeliveryYear;
///交割月
TThostFtdcMonthType DeliveryMonth;
///市价单最大下单量
TThostFtdcVolumeType MaxMarketOrderVolume;
///市价单最小下单量
TThostFtdcVolumeType MinMarketOrderVolume;
///限价单最大下单量
TThostFtdcVolumeType MaxLimitOrderVolume;
///限价单最小下单量
TThostFtdcVolumeType MinLimitOrderVolume;
///合约数量乘数
TThostFtdcVolumeMultipleType VolumeMultiple;
///最小变动价位
TThostFtdcPriceType PriceTick;
///创建日
TThostFtdcDateType CreateDate;
///上市日
TThostFtdcDateType OpenDate;
///到期日
TThostFtdcDateType ExpireDate;
///开始交割日
TThostFtdcDateType StartDelivDate;
///结束交割日
TThostFtdcDateType EndDelivDate;
///合约生命周期状态
TThostFtdcInstLifePhaseType InstLifePhase;
///当前是否交易
TThostFtdcBoolType IsTrading;
///持仓类型
TThostFtdcPositionTypeType PositionType;
///持仓日期类型
TThostFtdcPositionDateTypeType PositionDateType;
///多头保证金率
TThostFtdcRatioType LongMarginRatio;
///空头保证金率
TThostFtdcRatioType ShortMarginRatio;
///是否使用大额单边保证金算法
TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;
///基础商品代码
TThostFtdcInstrumentIDType UnderlyingInstrID;
///执行价
TThostFtdcPriceType StrikePrice;
///期权类型
TThostFtdcOptionsTypeType OptionsType;
///合约基础商品乘数
TThostFtdcUnderlyingMultipleType UnderlyingMultiple;
///组合类型
TThostFtdcCombinationTypeType CombinationType;
};
@dataclass
class ContractData(BaseData):
"""
Contract data contains basic information about each contract traded.
"""
symbol: str
exchange: Exchange
name: str
product: Product
size: float
pricetick: float
min_volume: float = 1 # minimum trading volume of the contract —— 这个就是最小交易手数
stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None
option_listed: datetime = None
option_expiry: datetime = None
option_portfolio: str = ""
option_index: str = "" # for identifying options with same strike price
需要自行设计,因连续的两条交易指令send_order()之间必须有时间间隔,否则前一条会被覆盖,交易所只会收收到后一条。
也就是说,你必须设计一个独立的大单委托管理机制,可以设计类似算法交易的vwap、iceberg这类的东西来支持你的交易策略。
这个很正常。当一个品种的合约的状态信息都一致的时候,就只播发品种的状态信息,如果个别的状态与品种不一致的时候,就在播发具体合约的状态信息。
例如一个合约发生了涨停板或者跌停板则会休市一定时间,此时该品种的其他合约是正常的,那么此时您就会只收到一个合约的状态变化信息。
所以,要找一个合约的状态信息,首先匹配合约的vt_symbol,找不到的时候在找品种的vt_symbol。
jerrychen wrote:
VNPY真的太陽春了。而且內定綁死RQDATA。對於國外用戶根本是個超大障礙!
我卡在RQDATA這邊卡好久
vnpy可以提供多种DataFeed的选择,例如:
老秦 wrote:
合约中有两个字段: 多头保证金率 "LongMarginRatioByMoney", 空头保证金率 ="ShortMarginRatioByMoney"。可是交易所网站通常发布的是一个保证金率,是不是意味着这两个保证金率取值相同?有见过不同的吗?
通常是一样的,但也应该分别计算,不可存在侥幸心理。
你说的对,检查了一下,确实是遗漏了对class CtaManager修改的代码。
我已经修改了一楼的帖子,你查看下吧。
少林寺猫猫 wrote:
hxxjava wrote:
少林寺猫猫 wrote:
老师您好,实践上UI会缺少了右边的条件管理器窗口。是不是class CtaManager(QtWidgets.QWidget):部分的修改代码没贴上来的原因哦?
不是的,相关的代码在StrategyManager中,一楼的帖子中有。
实在抱歉,老师。我看了很久StrategyManager的代码,也测试过几次,都只是显示左侧的条件单管理器内容,而没有右侧的条件单管理器内容。我开始尝试在class CtaManager中参照停止单的写法,直接调用了ConditionOrderMonitor类,就显示了右侧的条件单管理器内容,但是部分功能会有异常报错。如双击撤单有问题等。
CTA策略管理器的右侧共分三栏:停止单列表,条件单列表和 策略运行日志列表。
怎么会有左侧的呢?你截图看看。
少林寺猫猫 wrote:
老师您好,实践上UI会缺少了右边的条件管理器窗口。是不是class CtaManager(QtWidgets.QWidget):部分的修改代码没贴上来的原因哦?
不是的,相关的代码在StrategyManager中,一楼的帖子中有。