zeal的官网网址:https://zealdocs.org/
Windows,Linux和BSD各种平台,选择属于自己需要的平台。我选择的是免安装的Portable的压缩包:
windows平台的Portable压缩包下载之后无需安装,所以设置和文档都存放在应用目录下,比较方便。下载之后直接解压到指定目录下,找到运行其中zeal.exe就可以了。
浏览离线文档既可以在zeal内部浏览器显示感兴趣的主题,也可以选择外边浏览器。至此你再也不用为查询python语法、函数解释和用法而烦恼了。
vnpy中多采用各种应用系统的策略进行交易的,虽然也有各种日志和提示出现,但平常总是静悄悄的。
如果你想了解系统和策略的运行情况,可以查看各种运行日志,例如MainWindow的日志,委托列表,成交的列表,账户列表。想要查询你的策略运行情况,可以查看你的策略管理器的变量输出,等等。可是人总不能一直盯着屏幕,那样太累了。
如果能够有个声音和语音播报各种交易活动,用户会及时得到提醒。例如:
在vnpy\usertools目录下添加文件sound_player.py,内容如下:
"""
多线程音乐和文本播放器,介绍如下:
特点:
既可以播放wav,mp3 等格式,有可以对文本进行播放。
使用消息引擎创建该音乐播放器,多线程并行播放声音,不会因为播放声音而阻塞业务流程。
假设其他应用或者策略中需要播放声音, sound_name为字符型的声音文件名称,使用方法有两种:
方法1: 先获取消息引擎event_engine(注:与SoundPlayer实例成交时使用的消息引擎是
相同的),那么可以这样播放:
event_engine.put(Event(EVENT_SOUND,"Connected.wav"))
event_engine.put(Event(EVENT_SPEEK,"您收到一条委托单"))
方法2: 将SoundPlayer多play_sound()接口安装到MainEngine到实例main_engine,那么
可以先回去获取main_engine,然后这样播放:
event_engine.play_sound("Connected.wav")
event_engine.speek_text("您收到一条委托单")
作者:hxxjava 时间:2023-2-14,情人节————献给心爱的人!
修改:增加回测与实盘的区分功能,使得只在实盘环境才播放声音和文本。
修改:hxxjava 时间:2023-2-28
依赖库:pyttsx3, 安装:pip install pyttsx3
"""
from typing import Any
from pathlib import Path
from threading import Thread
from vnpy.trader.engine import EventEngine,Event
from winsound import PlaySound,SND_FILENAME
import pyttsx3
EVENT_SOUND = "eSound."
EVENT_SPEEK = "eSpeak."
class SoundPlayer():
"""
多线程声音播放器
"""
def __new__(cls, *args, **kwargs):
""" singleton constructor """
if not hasattr(cls, "_instance"):
cls._instance = super(SoundPlayer, cls).__new__(cls)
return cls._instance
def __init__(self,event_engine:EventEngine,switch:bool=True):
""" 初始化函数 """
self.event_engine = event_engine
# control play sound file
self.switch = switch
self.register_event()
def register_event(self):
""" """
self.event_engine.register(EVENT_SOUND,self.process_sound_event)
self.event_engine.register(EVENT_SPEEK,self.process_speak_event)
def set_switch(self,switch:bool=True):
""" set the swith which control play sound file """
self.switch = switch
def _get_sound_path(self,sound_name: str):
"""
Get path for sound file with sound name.
"""
this_file_path:Path = Path(__file__).parent
sound_path:Path = this_file_path.joinpath("sounds", sound_name)
return str(sound_path)
def process_sound_event(self,event:Event):
""" EVENT消息处理过程 """
wavname,is_testing = event.data['wavname'],event.data['is_testing']
if self.switch == True and is_testing == False:
filename = self._get_sound_path(wavname)
thread = Thread(target=self._play_sound,kwargs=({"filename":filename}),daemon=True)
thread.start()
def process_speak_event(self,event:Event):
""" EVENT消息处理过程 """
santence,is_testing = event.data['santence'],event.data['is_testing']
if self.switch == True and is_testing == False:
santence:str = event.data
thread = Thread(target=self._do_speak,kwargs=({"santence":santence}),daemon=True)
thread.start()
def _play_sound(self,filename:str):
""" 音乐文件播放线程执行过程 """
PlaySound(filename,SND_FILENAME)
def _do_speak(self,santence:str):
""" 文本播放线程执行过程 """
print(santence)
speaker = pyttsx3.init()
speaker.say(santence)
speaker.runAndWait()
def play_sound(self,sound_name:str,is_testing:bool=False):
"""
用户音乐播放接口。
参数:
sound_name:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SOUND,{"wavname":sound_name,"is_testing":is_testing}))
def speak_text(self,santence:str,is_testing:bool=False):
"""
用户文字播放接口。
参数:
santence:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SPEEK,{"santence":santence,"is_testing":is_testing}))
在vnpy\trader\engine.py中做如下修改:
from vnpy.usertools.sound_player import SoundPlayer
self.sound_player = SoundPlayer(event_engine,True) # test sound player
def add_function(self) -> None:
"""Add query function to main engine."""
... ...
self.main_engine.play_sound = self.sound_player.play_sound
self.main_engine.speak_text = self.sound_player.speak_text
这样你的MainEngine就有了可以音乐和语音功能了。
class sound_player规定音频文件存放在vnpy\usertools\sounds\目录下,当然你也可以修改代码中规定的目录,放在自己喜欢的目录下。
文件可以是wav、mp3格式的音乐文件均可,可以自己录制。
取一些有意义的文件名,如connected.wav代表网络连接成功,disconnection.wav代表网络断开,自己发挥吧,方便自己在自己vnpy系统中用函数调用。
本来本人有一套音乐文件的,可是论坛里没有文件上传功能,所以无法共享给大家,如果需要可以私信我。
下面用连接网关成功和连接断开,分别给出音乐和语音播放的示例:
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("Connected.wav")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("ConnectionLost.wav")
# 增加引用
from vnpy.usertools.sound_player import EVENT_SOUND,EVENT_SPEEK
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
# 当策略收到委托单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND,
{"wavname":"order.wav","is_testing":is_testing}
)
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
# 当策略收到成交单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND, {"wavname":"traded.wav","is_testing":is_testing} )
event_engine.put(event)
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"感谢您,连接{gateway.name}的{gateway.type}接口成功!")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"请注意:{gateway.name}的{gateway.type}接口已断开!")
假设你的策略中实现了on_order()和on_trade()这两个回调函数:
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到委托单,价格{order.price},手数{order.volume},已经成交{order.traded}",
"is_testing":is_testing})
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到{trade.vt_symbol}成交单,成交价{trade.price}手数{trade.volume}.",
"is_testing":is_testing})
event_engine.put(event)
实盘中用户策略是可以通过应用应用引擎获得vnpy系统的MainEngine的,这样就可以使用 play_sound() 和 speak_text()函数来播放音乐和语音了。但是,在策略中使用这个两个播放函数,应该考虑到回测时不要有声音的。应该根据应用引擎的不同,在策略中使用 play_sound() 和 speak_text()时,将参数is_testing设置为True,这样策略回测就不会有音乐和语音了。
这里以CTA策略模板CtaTemplate为例,演示如何将音乐和语音播放功能封装到各种应用的策略中,其他应用系统的模板可以参考以下的做法去封装,就不再一一讲解。
# 在引用部分增加对音乐和语音播器的引用
from vnpy.usertools.sound_player import SoundPlayer
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
... ... # 原来的初始化代码
# 音乐和语音播放器 hxxjava add
self.sound_player:SoundPlayer = SoundPlayer(self.cta_engine.event_engine)
def play_sound(self,sound_name:str):
""" 播放音乐 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.play_sound(sound_name)
def speak_text(self,santence:str):
""" 播放语音 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.speak_text(santence)
经过上面对CtaTemplate的修改,用户策略中就可以像下面的语句一样直接调用音乐和语音播放了,更加简便。
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化开始")
self.load_bar(10)
self.speak_text(f"策略{self.strategy_name}开始初始化")
CTA策略模块原来是有停止单的,本人后来又在添加了条件单功能。使用中很多用户反应有这样的问题,那就是策略已经发出过停止单或条件单,但是还未触发,但是因为某种原因策略被关机了,再次启动该策略时发现之前发出过停止单或条件单没有了,非常不方便。如果能够在策略再次启动的时候,把历史的停止单或条件单回复出来就好了。
那么如何把历史的停止单或条件单回复出来呢?把策略运行时曾经发出的停止单或条件单保存到文件或者数据库,在策略再次启动时,从文件或者数据库读取出来,恢复到CTA策略管理器的停止单或条件单列表,让它们继续运行就可以了。
这就有选择的问题:
包括如下:
在vnpy_ctastrategy命令下新建一个文件utitlity.py,其内容如下:
"""
实现停止单字典和条件单字典的存取功能,功能如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
作者:hxxjava
时间:2023-2-12
"""
import json
from datetime import datetime
from vnpy.trader.constant import Direction,Offset
from vnpy_ctastrategy.base import (
StopOrder,
StopOrderStatus,
ConditionOrder,
Condition,
ExecutePrice,
CondOrderStatus,
)
from vnpy.trader.utility import get_file_path,get_folder_path,save_json
class StopOrderEncoder(json.JSONEncoder):
"""
停止单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,StopOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,StopOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class StopOrderDecoder(json.JSONDecoder):
"""
停止单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'StopOrderStatus':
value = d['_value_']
instance = StopOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'StopOrder':
instance = StopOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
class CondOrderEncoder(json.JSONEncoder):
"""
条件单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,Condition):
d['_value_'] = obj.value
elif isinstance(obj,ExecutePrice):
d['_value_'] = obj.value
elif isinstance(obj,CondOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,ConditionOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class CondOrderDecoder(json.JSONDecoder):
"""
条件单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'Condition':
value = d['_value_']
instance = Condition(value)
elif class_name == 'ExecutePrice':
value = d['_value_']
instance = ExecutePrice(value)
elif class_name == 'CondOrderStatus':
value = d['_value_']
instance = CondOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'ConditionOrder':
instance = ConditionOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
def save_stop_order(filename: str,data: dict) -> None:
"""
Save StopOrder dict into {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=StopOrderEncoder,
ensure_ascii=False
)
def load_stop_order(filename: str) -> dict:
"""
Load StopOrder dict from {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=StopOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
def save_condition_order(filename: str,data: dict) -> None:
"""
Save ConditionOrder dict into {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=CondOrderEncoder,
ensure_ascii=False
)
def load_condition_order(filename: str) -> dict:
"""
Load ConditionOrder dict from {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=CondOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
在文件cta_strategy\engine.py的class MyCtaEngine下面增加下面的代码。class MyCtaEngine已经在比停止单更好用的条件单——ConditionOrder一文中分享给大家来,虽然这次贴出其完整代码,但这里只介绍与停止单和条件单的保存与恢复有关的内容。
在cta_strategy\engine.py的引用部分增加这些代码:
from .utility import save_stop_order, load_stop_order, save_condition_order, load_condition_order # hxxjava add
class MyCtaEngine下面增加下面的代码:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
4. 定时保存已经初始化策略的停止单和条件单到json文件。
5. 提供历史策略的停止单和条件单的查询接口。
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
self.seconds = 0
self.save_orders_interval = 10
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
self.event_engine.register(EVENT_ALL_PENDING_ORDER, self.process_pending_order_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_pending_order_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
pending_orders:PendingOrders = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(pending_orders.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行全挂单消息推送
self.call_strategy_func(strategy, strategy.on_pending_orders, pending_orders)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(tick.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.put_cond_order_event(order)
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def put_cond_order_event(self, cond_order: ConditionOrder) -> None:
"""
Put an event to update condition order status.
"""
event: Event = Event(EVENT_CONDITION_ORDER, cond_order)
self.event_engine.put(event)
def save_stop_orders(self,strategy_name:str,active_only:bool=False):
""" 保存加载历史停止单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
stop_orders = {}
for order_id,stop_order in self.stop_orders.items():
if stop_order.strategy_name == strategy_name:
if active_only and stop_order.status == StopOrderStatus.WAITING:
stop_orders[order_id] = stop_order
count += 1
else:
stop_orders[order_id] = stop_order
count += 1
file_name = f"{strategy_name}.json"
save_stop_order(file_name,stop_orders)
return count
def load_stop_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史停止单 """
file_name = f"{strategy_name}.json"
stop_orders:Dict[str,StopOrder] = load_stop_order(file_name)
if not stop_orders:
return
if not active_only:
loaded_orders = stop_orders
else:
loaded_orders = {}
for id,stop_order in stop_orders.items():
if stop_order.status == StopOrderStatus.WAITING:
loaded_orders[id] = stop_order
if loaded_orders:
self.stop_orders.update(loaded_orders)
# 更新GUI中加载的停止单列表
for stop_order in loaded_orders.values():
self.put_stop_order_event(stop_order)
def save_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 保存加载历史条件单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
cond_orders = {}
for order_id,cond_order in self.condition_orders.items():
if cond_order.strategy_name == strategy_name:
if active_only and cond_order.status == CondOrderStatus.WAITING:
cond_orders[order_id] = cond_order
count += 1
else:
cond_orders[order_id] = cond_order
count += 1
file_name = f"{strategy_name}.json"
save_condition_order(file_name,cond_orders)
return count
def load_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史条件单 """
file_name = f"{strategy_name}.json"
cond_orders:Dict[str,ConditionOrder] = load_condition_order(file_name)
if not cond_orders:
return
if not active_only:
loaded_orders = cond_orders
else:
loaded_orders = {}
for id,cond_order in cond_orders.items():
if cond_order.status == CondOrderStatus.WAITING:
loaded_orders[id] = cond_order
if loaded_orders:
self.condition_orders.update(loaded_orders)
# 更新GUI中加载的条件单列表
for cond_order in loaded_orders.values():
self.put_cond_order_event(cond_order)
def process_timer_event(self,event:Event) -> None:
# 定时保存策略的
self.seconds += 1
if self.seconds % self.save_orders_interval:
return
if self.get_engine_type() != EngineType.LIVE:
# 只有实盘引擎才保存停止单和条件单,回测引擎则不保存
return
for strategy in self.strategies.values():
if strategy.inited:
cnt1 = self.save_stop_orders(strategy.strategy_name)
cnt2 = self.save_condition_orders(strategy.strategy_name)
# print(f"保存了策略 {strategy.strategy_name} 的 {cnt1} 个停止单,{cnt2} 个条件。")
这里主要介绍在CtaTemplate中增加点与加载历史停止单和条件单相关的成员变量:
其中:
class CtaTemplate的其他代码见本人之前的帖子中的代码:比停止单更好用的条件单——ConditionOrder。
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# 是否在启动之后加载历史停止单和条件单
self.history_order:bool = False # hxxjava add
self.active_only:bool = False # hxxjava add
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
# 其他代码省略 ... ...
class DemoStrategy(CtaTemplate):
""" 示例策略 """
author = "hxxjava"
capital : float = 200000.0 # 交易资金
max_loss_ratio : int = 6 # 每次开仓最大亏损比例
max_open_times : int = 3 # 每次开仓最大亏损比例
dir_interval : str = '1m' # 方向周期单位,只能够是:'1m','1h','d'或'w'中的一个
dir_window : int = 30 # 方向周期窗口
op_interval : str = '1m' # 操作周期单位,只能够是:'1m','1h','d'或'w'中的一个
op_window : int = 3 # 操作周期窗口
load_days : int = 10 # 加载历史行情的天数
OpenSelect:str = "逆转价"
show_chart: bool = True # 是否需要显示图表
dir_trend: str = ""
op_trend: str = ""
parameters = [
"capital",
"max_loss_ratio",
"max_open_times",
"dir_interval",
"dir_window",
"op_interval",
"op_window",
"load_days",
"OpenSelect",
"show_chart",
"history_order", # 启动时是否加载停止单和条件单选项
"active_only", # 是否只加载仍然有效的停止单和条件单选项,
]
long_pos:float = 0 # 持有多仓
short_pos:float = 0 # 持有空仓
variables = [
"dir_trend",
"op_trend",
"long_pos",
"short_pos"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
self.history_order = True # 启动时载历史停止单和条件单
self.active_only = False # 加载又有点历史停止单和条件单
# 其他的代码省略
def on_start(self):
"""
Callback when strategy is started.
"""
is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
if is_live and self.history_order:
# 只有实盘才保存策略的历史停止单和条件单
from vnpy_ctastrategy.engine import MyCtaEngine
cta_engine:MyCtaEngine = self.cta_engine
cta_engine.load_stop_orders(self.strategy_name,self.active_only)
cta_engine.load_condition_orders(self.strategy_name,self.active_only)
self.write_log("加载历史停止单和条件单已执行。")
# 其他的代码省略
在用户策略未启动的情况下,还可以设置有关加载停止单和条件单委托的选项,如图所示:
下图是策略执行了条件单之后被关闭,再次重新启动之后回复的条件单。当然,停止单也是可以的实现恢复历史的,大家可以去试。
以条件单为例,如下图所示,条件单通常保存在用户目录下的.vntrader\cond_orders{策略名称}.json文件中:
{
"0213082700975": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "多"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4072.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": ">"
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "设定价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 08:27:00.975422"
},
"trigger_time": null,
"cond_orderid": "0213082700975",
"traded": 0.0,
"vt_orderids": [],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已撤销"
},
"before_trigger": null,
"after_traded": null
},
"0213090325941": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "空"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4063.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": "<="
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "极限价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.941102"
},
"trigger_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.500000"
},
"cond_orderid": "0213090325941",
"traded": 0.0,
"vt_orderids": [
"CTP.12_-2076558284_1"
],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已触发"
},
"before_trigger": null,
"after_traded": null
}
}
K线图表中主图与副图都有信息板,当我们用选择某个K线时,它用文字的形式表达每个K线修改的信息以及指标数值,它很有用。但是有时因为主图或者副图的指标过多,抑或是显示的信息过多,它会覆盖主图或副图的左上角或者右上角开始的很大一片区域,很影响我们的观看效果,尤其是遇到下跌行情时,K线从主图的左上角开始,到主图的右下角结束,那么这个时候你无论如何也看不到左上角的K线,因为它们被信息板遮住了。怎么办?隐藏信息板是个好办法。
修改vnpy\chart\widget.py中的class ChartCursor,代码如下:
class ChartCursor(QtCore.QObject):
""""""
def __init__(
self,
widget: ChartWidget,
manager: BarManager,
plots: Dict[str, pg.GraphicsObject],
item_plot_map: Dict[ChartItem, pg.GraphicsObject]
) -> None:
""""""
super().__init__()
self._widget: ChartWidget = widget
self._manager: BarManager = manager
self._plots: Dict[str, pg.GraphicsObject] = plots
self._item_plot_map: Dict[ChartItem, pg.GraphicsObject] = item_plot_map
self._x: int = 0
self._y: int = 0
self._plot_name: str = ""
self._info_visibles:dict[str,bool] = {} # hxxjava add 2023-2-10
self._init_ui()
self._connect_signal()
def _init_ui(self) -> None:
""""""
self._init_line()
self._init_label()
self._init_info()
def _init_line(self) -> None:
"""
Create line objects.
"""
self._v_lines: Dict[str, pg.InfiniteLine] = {}
self._h_lines: Dict[str, pg.InfiniteLine] = {}
self._views: Dict[str, pg.ViewBox] = {}
pen: QtGui.QPen = pg.mkPen(WHITE_COLOR)
for plot_name, plot in self._plots.items():
v_line: pg.InfiniteLine = pg.InfiniteLine(angle=90, movable=False, pen=pen)
h_line: pg.InfiniteLine = pg.InfiniteLine(angle=0, movable=False, pen=pen)
view: pg.ViewBox = plot.getViewBox()
for line in [v_line, h_line]:
line.setZValue(0)
line.hide()
view.addItem(line)
self._v_lines[plot_name] = v_line
self._h_lines[plot_name] = h_line
self._views[plot_name] = view
def _init_label(self) -> None:
"""
Create label objects on axis.
"""
self._y_labels: Dict[str, pg.TextItem] = {}
for plot_name, plot in self._plots.items():
label: pg.TextItem = pg.TextItem(
plot_name, fill=CURSOR_COLOR, color=BLACK_COLOR)
label.hide()
label.setZValue(2)
label.setFont(NORMAL_FONT)
plot.addItem(label, ignoreBounds=True)
self._y_labels[plot_name] = label
self._x_label: pg.TextItem = pg.TextItem(
"datetime", fill=CURSOR_COLOR, color=BLACK_COLOR)
self._x_label.hide()
self._x_label.setZValue(2)
self._x_label.setFont(NORMAL_FONT)
plot.addItem(self._x_label, ignoreBounds=True)
def _init_info(self) -> None:
"""
"""
self._infos: Dict[str, pg.TextItem] = {}
for plot_name, plot in self._plots.items():
info: pg.TextItem = pg.TextItem(
"info",
color=CURSOR_COLOR,
border=CURSOR_COLOR,
fill=BLACK_COLOR
)
info.hide()
info.setZValue(2)
info.setFont(NORMAL_FONT)
plot.addItem(info) # , ignoreBounds=True)
self._infos[plot_name] = info
def _connect_signal(self) -> None:
"""
Connect mouse move signal to update function.
"""
self._widget.scene().sigMouseMoved.connect(self._mouse_moved)
self._widget.scene().sigMouseClicked.connect(self._mouse_clicked) # hxxjava add 2023-2-10
def isInfoVisible(self,plot_name:str):
""" 获取信息板的隐显 hxxjava add 2023-2-10 """
if plot_name not in self._info_visibles:
self._info_visibles[plot_name] = True
return self._info_visibles[plot_name]
def setInfoVisible(self,plot_name:str,visible:bool):
""" 设置信息板隐显 hxxjava add 2023-2-10 """
self._info_visibles[plot_name] = visible
def _mouse_clicked(self,evt): # hxxjava add 2023-2-10
""" 用鼠标左键+CTRL键隐显信息板 2023-2-10 """
button = evt.button()
modifiers = evt.modifiers()
if button == QtCore.Qt.LeftButton and modifiers == QtCore.Qt.ControlModifier:
if self._plot_name in self._infos:
text_info = self._infos.get(self._plot_name,None)
old_value = self.isInfoVisible(self._plot_name)
self.setInfoVisible(self._plot_name,not old_value)
text_info.setVisible(not old_value)
def _mouse_moved(self, evt: tuple) -> None:
"""
Callback function when mouse is moved.
"""
if not self._manager.get_count():
return
# First get current mouse point
pos: tuple = evt
for plot_name, view in self._views.items():
rect = view.sceneBoundingRect()
if rect.contains(pos):
mouse_point = view.mapSceneToView(pos)
self._x = to_int(mouse_point.x())
self._y = mouse_point.y()
self._plot_name = plot_name
break
# Then update cursor component
self._update_line()
self._update_label()
self.update_info()
def _update_line(self) -> None:
""""""
for v_line in self._v_lines.values():
v_line.setPos(self._x)
v_line.show()
for plot_name, h_line in self._h_lines.items():
if plot_name == self._plot_name:
h_line.setPos(self._y)
h_line.show()
else:
h_line.hide()
def _update_label(self) -> None:
""""""
bottom_plot: pg.PlotItem = list(self._plots.values())[-1]
axis_width = bottom_plot.getAxis("right").width()
axis_height = bottom_plot.getAxis("bottom").height()
axis_offset: QtCore.QPointF = QtCore.QPointF(axis_width, axis_height)
bottom_view: pg.ViewBox = list(self._views.values())[-1]
bottom_right = bottom_view.mapSceneToView(
bottom_view.sceneBoundingRect().bottomRight() - axis_offset
)
for plot_name, label in self._y_labels.items():
if plot_name == self._plot_name:
label.setText(str(self._y))
label.show()
label.setPos(bottom_right.x(), self._y)
else:
label.hide()
dt: datetime = self._manager.get_datetime(self._x)
if dt:
self._x_label.setText(dt.strftime("%Y-%m-%d %H:%M:%S"))
self._x_label.show()
self._x_label.setPos(self._x, bottom_right.y())
self._x_label.setAnchor((0, 0))
def update_info(self) -> None:
""""""
buf: dict = {}
for item, plot in self._item_plot_map.items():
item_info_text: str = item.get_info_text(self._x)
if plot not in buf:
buf[plot] = item_info_text
else:
if item_info_text:
buf[plot] += ("\n\n" + item_info_text)
for plot_name, plot in self._plots.items():
plot_info_text: str = buf[plot]
info: pg.TextItem = self._infos[plot_name]
info.setText(plot_info_text)
if self.isInfoVisible(plot_name): # hxxjava add 2023-2-10
info.show()
view: pg.ViewBox = self._views[plot_name]
top_left = view.mapSceneToView(view.sceneBoundingRect().topLeft())
info.setPos(top_left)
def move_right(self) -> None:
"""
Move cursor index to right by 1.
"""
if self._x == self._manager.get_count() - 1:
return
self._x += 1
self._update_after_move()
def move_left(self) -> None:
"""
Move cursor index to left by 1.
"""
if self._x == 0:
return
self._x -= 1
self._update_after_move()
def _update_after_move(self) -> None:
"""
Update cursor after moved by left/right.
"""
bar: BarData = self._manager.get_bar(self._x)
self._y = bar.close_price
self._update_line()
self._update_label()
def clear_all(self) -> None:
"""
Clear all data.
"""
self._x = 0
self._y = 0
self._plot_name = ""
for line in list(self._v_lines.values()) + list(self._h_lines.values()):
line.hide()
for label in list(self._y_labels.values()) + [self._x_label]:
label.hide()
详细的请见中国人大网2000年12月17日发布的进入证券交易所参与集中竞价交易必须具备什么资格?
由此可以知道:
答案是肯定的,可以。
商品期货:
白天盘品种集合竞价时间是每个交易日08:55--09:00.其中08:55--08:59是指令申报时间,08:59--09:00是指令 撮合时间:
夜盘品种集合竞价时间是每个交易日20:55--21:00.其中20:55--20: 59是指令 申报时间,20:59--21:00是指令 撮合时间,夜盘品种白天不再进行集合竞价。
国债期货:
集合竞价时间是每个交易日09:10--09:15.其中09:10--09: 14是指令申报时间,09:14--09: 15是指令撮合时间。
这里要提醒大家如果您委托单子,但是系统显示的是“没开盘”,那么这是因为您是在非集合竞价报单时间段委托。
作为普通投资者,只要合法连接了CTP接口,您就同时登录了行您选择的经纪商的行情服务器和交易服务器,注意:经纪商的而不是交易所的,也就是您开户并且做了CTP入网认证的那个证券公司的。
行情服务器会实时通知各个合约的交易状态信息。 当您所交易的合约的交易状态为集合竞价状态时,您就可以同手动或者策略自动使用CtpGateway的send_order(),cancel_order()函数,调用交易接口(CtpTdApi)进行委托下单和委托撤单,但是委托下单的结果必须等到集合竞价撮合之后才会得到成交与否。
但是与连续竞价期间的委托不同,在集合竞价撮合结束前,是没有任何成交结果推送给客户端的,而绝大部分策略是依靠行情变化来生成交易信号的,这是最主要的困难!
当然非行情驱动的策略可以在没有行情变化到情况下,自动实现在集合竞价阶段实现委托下单,但是这样做的意义不大。因为由基本面信息、黑天鹅、自然灾害、战争等突发事件等非行情输入驱动的策略,通常是长线策略或者是需要由行情来证实的策略,无需急迫地在集合竞价阶段来参与,为什么不在今天的集合竞价结果出来之后在做打算?
当然,在解决了交易信号生成的情况下,用户策略是可以在集合竞价状态实现自动委托下单的,至于此时如何生成交易信号是另外一个话题了。
提到通达信函数XMA,人们最常见到的词汇是“未来函数”、“欺骗”、“陷阱”、“坑”......等等不好的字眼,仿佛XMA函数是个捉摸不定的未来函数,是你亏损的根源!
其实大家对这个函数不了解,如果你了解了它的实现机理,它的优点和不得已的缺点,扬长避短,是完全可以使用的。
未来叙述的方便,先指标一个供MA与XMA计算的数组:
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
数据:[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20.]
分析周期N=5
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
MA5 [nan nan nan nan 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18.]
位置:[ A B C D E F H I J K L M N O P Q R S T U ]
XMA5:[ 2. 2.5 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 18.5 19. ]
位置:[ A B C D E F H I J K L M N O P Q R S T U W(21) X(22)]
XMA5:[ nan nan nan nan 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. ]
它们的相同点是均值算法都是一样的,都是N个数的算术平均值,但是均值代表的位置是不同的。
N日MA均值是必须满足N个数据,不足时为nan,其所代表的位置是第N日的均值;
N日XMA均值是必须满足N//2+1个数据,不足时为没有意义,其所代表的位置是第N//2+1日的均值,它的计算是用前N//2个数数据、当前日和后N//2个数数据。当数据个数多于N//2个但不够N数的时候,是有几个数算几个数的均线。
以N=5为XMA例:
MA具有确定性,不会受到下一个数据的变化的影响,而其计算出来的结果一定是滞后的。
从这里我们可以看到,XMA其实中历史数据个数大于N的时候,它的结果能够代表人们通常理解的均值,但是因为它用的数据包含当前位置之后N//2日的数据,因为使用预测的数据,XMA随着计算位置的不同,其计算的数据范围发生了变化。
以下为用python实现XMA函数代码,已经与通达信的自带函数XMA函数做了严格的验证,数据相同的情况下,XMA的均值是完全相同的,也就是说是可靠移植的,可以信赖的!
如果有怀疑,可以自行验证的。
def XMA(src:np.ndarray,N:int,array=False) -> np.ndarray:
""" XMA函数的python实现 """
data_len = len(src)
half_len : int = (N // 2)+(1 if N % 2 else 0)
if data_len < half_len:
out = np.array([np.nan for i in range(data_len)],dtype=float)
if array:
return out
return out[-half_len:] if len(out) else np.array([],dtype=float)
head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])
out = head
if data_len >= N:
body = talib.MA(src,N)[N-1:]
out = np.append(out,body)
tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)])
out = np.append(out,tail)
if array:
return out
return out[-half_len:]
class DynaArrayManager(ArrayManager):
"""
DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。
作者:hxxjava
"""
def __init__(self, size: int = 100) -> None:
super().__init__(size)
self.bar_datetimes:List[datetime] = []
def update_bar(self, bar: BarData) -> None:
if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
self.bar_datetimes.append(bar.datetime)
super().update_bar(bar)
else:
"""
Only Update all arrays in array manager with temporary bar data.
"""
self.open_array[-1] = bar.open_price
self.high_array[-1] = bar.high_price
self.low_array[-1] = bar.low_price
self.close_array[-1] = bar.close_price
self.volume_array[-1] = bar.volume
self.turnover_array[-1] = bar.turnover
self.open_interest_array[-1] = bar.open_interest
# ... ... 其他无关XMA部分略去
def xma(self,select:str="C",N:int=3,array: bool = False) -> np.ndarray:
""" XMA函数的 """
if select.upper() == "C":
src = self.close
elif select.upper() == "O":
src = self.open
elif select.upper() == "H":
src = self.high
elif select.upper() == "L":
src = self.low
else:
assert(False)
data_len = len(src)
half_len : int = (N // 2)+(1 if N % 2 else 0)
if data_len < half_len:
out = np.array([np.nan for i in range(data_len)],dtype=float)
if array:
return out
return out[-half_len:] if len(out) else np.array([],dtype=float)
head = np.array([talib.MA(src[0:ilen],ilen)[-1] for ilen in range(half_len,N)])
out = head
if data_len >= N:
body = talib.MA(src,N)[N-1:]
out = np.append(out,body)
tail = np.array([talib.MA(src[-ilen:],ilen)[-1] for ilen in range(N-1,half_len-1,-1)])
out = np.append(out,tail)
if array:
return out
return out[-half_len:]
class XmaItem(CandleItem):
""""""
def __init__(self, manager: BarManager,xma_window:int=10):
""""""
super().__init__(manager)
self.line_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
self.xma_window = xma_window
self.dyna_am = DynaArrayManager(xma_window)
self.xma_data: Dict[int, float] = {}
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_xma_value(self, ix: int) -> float:
""" """
max_ix = self._manager.get_count() - 1
if ix < 0 or ix > max_ix:
return np.nan
# When initialize, calculate all rsi value
if not self.xma_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
sma_array = XMA(np.array(close_data), self.xma_window,array=True)
for n, value in enumerate(sma_array):
self.xma_data[n] = value
# Return if already calcualted
if ix != max_ix and ix in self.xma_data:
return self.xma_data[ix]
if self.dyna_am.inited:
values = self.dyna_am.xma(select='C',N=self.xma_window)
vlen = len(values)
# print(f"vlen={vlen},values={list(values)}")
start_ix = ix-vlen+1
tail_idxs = [(start_ix+i,values[i]) for i in range(vlen)]
for idx,xma in tail_idxs:
self.xma_data[idx] = xma
return self.xma_data[ix]
return np.nan
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""" """
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# Draw XMA Line
xma_value = self._get_xma_value(ix)
last_xma_value = self._get_xma_value(ix - 1)
if not (np.isnan(xma_value) or np.isnan(last_xma_value)):
# Set painter color
painter.setPen(self.line_pen)
# draw XMA line
start_point = QtCore.QPointF(ix-1, last_xma_value)
end_point = QtCore.QPointF(ix, xma_value)
painter.drawLine(start_point, end_point)
# Finish
painter.end()
return picture
def paint(self, painter: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget) -> None:
"""
Reimplement the paint method of parent class.
This function is called by external QGraphicsView.
"""
# return super().paint(painter, opt, w)
rect = opt.exposedRect
half = (self.xma_window // 2)
min_ix: int = int(rect.left()) - half
min_ix: int = max(min_ix,0)
max_ix: int = int(rect.right())
max_ix: int = min(max_ix, len(self._bar_picutures))
rect_area: tuple = (min_ix, max_ix)
if (
self._to_update
or rect_area != self._rect_area
or not self._item_picuture
):
self._to_update = False
self._rect_area = rect_area
# print(f"XmaItem _draw_item_picture:{min_ix}--{max_ix}")
self._draw_item_picture(min_ix, max_ix)
self._item_picuture.play(painter)
def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
"""
Draw the picture of item in specific range.
"""
self._item_picuture = QtGui.QPicture()
painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)
for ix in range(min_ix, max_ix):
bar_picture: QtGui.QPicture = self._bar_picutures[ix]
# if bar_picture is None:
bar: BarData = self._manager.get_bar(ix)
bar_picture = self._draw_bar_picture(ix, bar)
self._bar_picutures[ix] = bar_picture
bar_picture.play(painter)
painter.end()
def get_info_text(self, ix: int) -> str:
""""""
if ix in self.xma_data:
xma_value = self.xma_data[ix]
text = f"XMA({self.xma_window}): {xma_value:.2f}"
else:
text = "XMA({self.xma_window}) -"
return text
尾部不断变化的XMA曲线:
绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......
无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!
临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。
vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:
class DynaArrayManager(ArrayManager):
"""
DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。
作者:hxxjava
"""
def __init__(self, size: int = 100) -> None:
super().__init__(size)
self.bar_datetimes:List[datetime] = []
def update_bar(self, bar: BarData) -> None:
if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
self.bar_datetimes.append(bar.datetime)
super().update_bar(bar)
else:
"""
Only Update all arrays in array manager with temporary bar data.
"""
self.open_array[-1] = bar.open_price
self.high_array[-1] = bar.high_price
self.low_array[-1] = bar.low_price
self.close_array[-1] = bar.close_price
self.volume_array[-1] = bar.volume
self.turnover_array[-1] = bar.turnover
self.open_interest_array[-1] = bar.open_interest
def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
Directional Movement Indicator:
TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
HD := HIGH-REF(HIGH,1); // 创新高量
LD := REF(LOW,1)-LOW; // 创新低量
DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
PDI: DMP*100/TR,LINETHICK2; //做多力量
MDI: DMM*100/TR,LINETHICK2; //做空力量
ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
ADXR:(ADX+REF(ADX,M))/2,LINETHICK2; // ADR:ADX与M日前ADX的均值
"""
TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
# TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
HD = self.high - REF(self.high,1)
LD = REF(self.low,1) - self.low
DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)
PDI = DMP*100/TR;
MDI = DMM*100/TR;
ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M) # ADX:多空力量差值M日平滑
ADXR = (ADX+REF(ADX,M))/2 # ADR:ADX与M日前ADX的均值
if array:
return PDI,MDI,ADX,ADXR
return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]
def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
MACD having three lines:(diff,dea,slow_dea) and macd histgram
"""
diff, dea, macd = talib.MACD(
self.close, fast_period, slow_period, signal_period
)
slow_dea = talib.EMA(dea,signal_period)
if array:
return diff, dea, macd, slow_dea
return diff[-1], dea[-1], macd[-1],slow_dea[-1]
下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。
class Macd3Item(ChartItem):
""" 三根线的MACD """
def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.short_window = short_window
self.long_window = long_window
self.M = M
self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
self.macd_data: Dict[int, List[float,float,float]] = {}
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_macd_value(self, ix: int) -> List:
""""""
max_ix = self._manager.get_count() - 1
invalid_value = [np.nan,np.nan,np.nan,np.nan]
if ix < 0 or ix > max_ix:
return invalid_value
# When initialize, calculate all macd value
if not self.macd_data:
bars:List[BarData] = self._manager.get_all_bars()
close_prices = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_prices),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
slow_deas = talib.EMA(deas,self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]
# Return if already calcualted
if ix != max_ix and ix in self.macd_data:
return self.macd_data[ix]
if self.dyna_am.inited:
diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
self.macd_data[ix] = [diff,dea,macd,slow_dea]
return [diff,dea,macd,slow_dea]
return [np.nan,np.nan,np.nan,np.nan]
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix < 1:
return picture
macd_value = self._get_macd_value(ix)
last_macd_value = self._get_macd_value(ix - 1)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
pass
else:
end_point2 = QtCore.QPointF(ix, macd_value[3])
start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
painter.setPen(self.magetan_pen)
painter.drawLine(start_point2, end_point2)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
# print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
"""
获得4个指标在y轴方向的范围
hxxjava 修改,2022-12-14
当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
"""
if not self.macd_data:
# 如果Ofcd数据没有计算过
return (-100,100)
this_range = (min_ix,max_ix)
if this_range == (None,None):
# 如果是查询全范围
min_ix,max_ix = self._rect_area # 显示索引范围
max_ix = min(max_ix,len(self.macd_data)) - 1 # 数据索引范围
this_range = min_ix,max_ix
if this_range in self._values_ranges:
# 查询范围已经存在,直接返回已经计算过的y范围值
result = self._values_ranges[this_range]
return result
# 查询范围不存在,重新计算y范围值
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
if ndarray.shape[0] == 0:
return (-100,100)
# 求值范围内的的MACD值的最大和最小值
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
if np.isnan(max_price) or np.isnan(max_price):
return (-100,100)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
self._values_ranges[this_range] = result
return result
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
[diff,dea,macd,slow_dea] = self.macd_data[ix]
words = [
f"Macd3{(self.short_window,self.long_window,self.M)}:",
f"diff {diff:.3f}",
f"dea {dea:.3f}",
f"slow_dea={slow_dea:.3f}",
f"macd {macd:.3f}",
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nslow_dea - \nmacd -"
return text
class DmiItem(ChartItem):
""" """
def __init__(self, manager: BarManager,N:int=14,M:int=7):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.N = N
self.M = M
self.dyna_am = DynaArrayManager(2*max(N,M))
self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
""""""
max_ix = self._manager.get_count()-1
invalid_data = (np.nan,np.nan,np.nan,np.nan)
if ix < 0 or ix > max_ix:
print(f"DmiItem{ix},invalid_data1")
return invalid_data
# When initialize, calculate all macd value
if not self.dmi_data:
bars:List[BarData] = self._manager.get_all_bars()
highs = np.array([bar.high_price for bar in bars])
lows = np.array([bar.low_price for bar in bars])
closes = np.array([bar.close_price for bar in bars])
pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)
for n in range(0,len(adx)):
self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])
# Return if already calcualted
if ix != max_ix and ix in self.dmi_data:
return self.dmi_data[ix]
if self.dyna_am.inited:
pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
self.dmi_data[ix] = [pdi,mdi,adx,adxr]
return [pdi,mdi,adx,adxr]
return invalid_data
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix > self.N + self.M:
# 画参考线
painter.setPen(self.ref_pen)
for ref in [20.0,50,80]:
painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))
# 画4根线
dmi_value = self._get_dmi_value(ix)
last_dmi_value = self._get_dmi_value(ix - 1)
pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
for i in range(4):
end_point0 = QtCore.QPointF(ix, dmi_value[i])
start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
painter.setPen(pens[i])
painter.drawLine(start_point0, end_point0)
# 多空颜色标示
pdi,mdi = dmi_value[0],dmi_value[1]
if not(np.isnan(pdi) or np.isnan(mdi)):
if abs(pdi - mdi) > 1e-2:
painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
""" """
return (0.0,100.0)
def get_info_text(self, ix: int) -> str:
""" """
if ix in self.dmi_data:
[pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
words = [
f"DMI{self.N,self.M}:",
f"PDI {pdi:.2f}",
f"MDI {mdi:.2f}",
f"ADX {adx:.2f}",
f"ADXR {adxr:.2f}",
]
text = "\n".join(words)
else:
text = f"DMI{self.N,self.M}:-"
return text
这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!
tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。
tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。
"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ
from vnpy.trader.database import get_database
from vnpy.trader.app import BaseApp
from threading import Thread
EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."
EVENT_TICKSIM_FINISHED = "eTickSimFinished."
TICK_SIM_ENGINE_NAME = "tick_sim"
class TickSimEngine(BaseEngine):
""" tick数据模拟器 """
def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
""" constructor """
super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)
self.ticks:List[TickData] = []
self.db = get_database()
self.speed = 1.0
self._active = False
self._pause = False
self.gateway_name = "CTP"
self.register_event()
print(f"tick数据模拟器已经安装。")
def register_event(self) -> None:
""" """
# self.event_engine.register(EVENT_TICK,self.process_tick_event)
self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event)
self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event)
self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event)
self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)
def load_ticks(self) -> None:
""" """
symbol,exchange = extract_vt_symbol(self.vt_symbol)
self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
print(f"TickSimulator load {len(self.ticks)} ticks")
def process_tick_event(self,event:Event) -> None:
""" """
tick:Dict = event.data
print(f"recieve:{tick}")
def process_ticksim_start_event(self,event:Event) -> None:
""" """
data:Dict = event.data
print(f"I got EVENT_TICKSIM_START:{data}")
self.start_simulator(**data)
def process_ticksim_pause_event(self,event:Event) -> None:
""" """
self._pause = not self._pause
print(f"I got EVENT_TICKSIM_PAUSE:{data}")
def process_ticksim_stop_event(self,event:Event) -> None:
""" """
self._active = True
print(f"I got EVENT_TICKSIM_STOP:{data}")
def process_ticksim_finished(self,event:Event) -> None:
""" """
# 送出收盘交易状态
tick:TickData = event.data
next_second:datetime = tick.datetime + timedelta(seconds=1)
next_second.replace(second=0,microsecond=0)
status = StatusData(
gateway_name=self.gateway_name,
symbol=tick.symbol,
exchange=tick.exchange,
settlement_group_id="100",
instrument_status=InstrumentStatus.CLOSE,
trading_segment_sn = 100,
enter_time = next_second.strftime("%H:%M"),
)
self.event_engine.put(Event(EVENT_STATUS,data=status))
self.stop()
print(f"finish_time:{tick.datetime}")
def start(self) -> None:
""" """
self._active = True
self.thread = Thread(target=self.run)
self.thread.start()
def stop(self) -> None:
""" """
self._active = False
self._pause = False
self.speed = 1.0
self.ticks.clear()
def run(self) -> None:
""" 仿真线程函数 """
while self._active:
# 遍历发送
if not self._pause:
if self.ticks:
tick = self.ticks.pop(0)
tick.gateway_name = self.gateway_name
event = Event(EVENT_TICK,tick)
# print(f"EVENT_TICK event data = {tick}")
self.event_engine.put(event)
if len(self.ticks) == 0:
print(f"all ticks are send out !")
self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))
sleep(0.5/self.speed)
print(f"TickSimulator is stoped !")
def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
""" 启动tick模拟器 """
self.vt_symbol = vt_symbol
self.start_time = start_time
self.end_time = end_time
self.speed = speed
self.gateway_name = gateway_name
self.load_ticks()
self.start()
这里给出最简单的使用方法,具体您可以发挥想象力。
""" """
event_engine = EventEngine()
main_engine = MainEngine(event_engine=event_engine)
main_engine.add_engine(TickSimEngine)
tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
end_time = datetime.now().replace(tzinfo=CHINA_TZ)
start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
tick_sim_engine.start_simulator(
vt_symbol='p2301.DCE', # 回放合约
start_time=start_time, # 开始时间
end_time=end_time, # 结束时间
speed=10.0 # 回放速度
)
data = {
"vt_symbol":'p2301.DCE',
"start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
"end_time": datetime.now().replace(tzinfo=CHINA_TZ),
"speed":10,
"gateway_name":"CTP"
} # 意义同上
event_engine.put(Event(EVENT_TICKSIM_START,data))
event_engine.put(Event(EVENT_TICKSIM_PAUSE))
event_engine.put(Event(EVENT_TICKSIM_STOP))
无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。
K线图表通常有主图和多个副图组成,它是我们观察合约 K线和指标的工具。但有时往往因为要显示的指标副图太多,去观察效果受到太大的影响。我们可以通过关闭全部或者是一个富途的方式来让显示变得更为简洁和细致。最方便的方式是使用鼠标来进行操作,而很多市面上的第三方软件也多采用此方法。可是看看维恩派的 K线图表因为没有考虑用户使用时候的这个需求,想实现这个功能还是需要费一番心思的。
那么怎么做呢?初步设想是:
修改文件vnpy\chart\widget.py
在引用部分加入下面内容:
from typing import Callable # hxxjava add
PlotItem的扩展类MyPlotItem:
class MyPlotItem(pg.PlotItem):
"""
MyPlotItem:re-implement of PlotItem.
hxxjava add
"""
def __init__(self,name:str,on_mouseDblClicked:Callable=None,*args,**kwargs) -> None:
""" add name attribute and on_mouseDblClicked callback for PlotItem """
super().__init__(*args,**kwargs)
self.name = name
self.on_mouseDblClicked = on_mouseDblClicked
def mouseDoubleClickEvent(self, event: QtGui.QMouseEvent) -> None:
""" re-implement PlotItem's mouseDoubleClickEvent """
super().mouseDoubleClickEvent(event)
if self.on_mouseDblClicked:
self.on_mouseDblClicked(self)
为class ChartWidget添加下面的鼠标双击的下面的回调函数。需要说明的是这里有个约定:容纳主图的PlotItem名称必须为"candle",其他的名称认为是副图,无需纠结是不是太过特别订制了。
def on_mouseDblClicked(self,plot:pg.PlotItem):
"""
所有内含图表被双击的回调函数。 hxxjava add
"""
if plot.name == "candle":
# 选择所有副图
others = [pi for pi in self._plots.values() if pi.name != plot.name]
else:
# 选择其他副图
others = [pi for pi in self._plots.values() if pi.name not in [plot.name,'candle']]
# 求当前幅图的高度变化量
delta_h = 0
for pi in others:
delta_h += pi.height()*(1 if pi.isVisible() else -1)
plot.setFixedHeight(plot.height()+delta_h)
# 隐藏/显示未变双击的图表
for pi in others:
pi.setVisible(not pi.isVisible())
修改class ChartWidget的add_plot()函数,代码如下:
def add_plot(
self,
plot_name: str,
minimum_height: int = 80,
maximum_height: int = None,
hide_x_axis: bool = False
) -> None:
"""
Add plot area.
"""
# Create plot object
# plot: pg.PlotItem = pg.PlotItem(axisItems={'bottom': self._get_new_x_axis()})
plot: pg.PlotItem = MyPlotItem(axisItems={'bottom': self._get_new_x_axis()},name=plot_name,on_mouseDblClicked=self.on_mouseDblClicked)
plot.setMenuEnabled(False)
plot.setClipToView(True)
plot.hideAxis('left')
plot.showAxis('right')
plot.setDownsampling(mode='peak')
plot.setRange(xRange=(0, 1), yRange=(0, 1))
plot.hideButtons()
plot.setMinimumHeight(minimum_height)
if maximum_height:
plot.setMaximumHeight(maximum_height)
if hide_x_axis:
plot.hideAxis("bottom")
if not self._first_plot:
self._first_plot = plot
# Connect view change signal to update y range function
view: pg.ViewBox = plot.getViewBox()
view.sigXRangeChanged.connect(self._update_y_range)
view.setMouseEnabled(x=True, y=True) # hxxjava change,old---view.setMouseEnabled(x=True, y=False)
# Set right axis
right_axis: pg.AxisItem = plot.getAxis('right')
right_axis.setWidth(60)
right_axis.tickFont = NORMAL_FONT
# Connect x-axis link
if self._plots:
first_plot: pg.PlotItem = list(self._plots.values())[0]
plot.setXLink(first_plot)
# Store plot object in dict
self._plots[plot_name] = plot
# Add plot onto the layout
self._layout.nextRow()
self._layout.addItem(plot)
完成了第2部分的代码后,您就可以使用ChartWidget像以往一样创建你的K线图表了。
双击其中的MACD副图后,K线图表窗口MACD副图会占据其他副图的显示区域,再次双击该MACD副图,K线图表窗口中的其他副图就再次显示出来了。
双击其中的的delta副图后,K线图表窗口delta副图会占据其他副图的显示区域,再次双击该delta副图,K线图表窗口中的其他副图就再次显示出来了。
实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:
目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!
为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。
交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。
套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:
在vnpy_ctastrategy\engine.py中增加下面的内容:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
"""
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
def update_setting(self, setting: dict) -> None:
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls) -> dict:
"""
Get default parameters dict of strategy class.
"""
class_parameters: dict = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self) -> dict:
"""
Get strategy parameters dict.
"""
strategy_parameters: dict = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self) -> dict:
"""
Get strategy variables dict.
"""
strategy_variables: dict = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self) -> dict:
"""
Get strategy data.
"""
strategy_data: dict = {
"strategy_name": self.strategy_name,
"vt_symbol": self.vt_symbol,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
@virtual
def on_init(self) -> None:
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_inited(self): # hxxjava add
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self) -> None:
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
@virtual
def on_status(self, status: StatusData=None):
"""
Callback of new status data update. # hxxjava add for trading status
"""
pass
@virtual
def on_tick(self, tick: TickData) -> None:
"""
Callback of new tick data update.
"""
pass
@virtual
def on_bar(self, bar: BarData) -> None:
"""
Callback of new bar data update.
"""
pass
@virtual
def on_trade(self, trade: TradeData) -> None:
"""
Callback of new trade data update.
"""
pass
@virtual
def on_order(self, order: OrderData) -> None:
"""
Callback of new order data update.
"""
pass
@virtual
def on_stop_order(self, stop_order: StopOrder) -> None:
"""
Callback of stop order update.
"""
pass
def buy(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send buy order to open a long position.
"""
return self.send_order(
Direction.LONG,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def sell(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send sell order to close a long position.
"""
return self.send_order(
Direction.SHORT,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def short(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send short order to open as short position.
"""
return self.send_order(
Direction.SHORT,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def cover(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send cover order to close a short position.
"""
return self.send_order(
Direction.LONG,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def send_order(
self,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send a new order.
"""
if self.trading:
vt_orderids: list = self.cta_engine.send_order(
self, direction, offset, price, volume, stop, lock, net
)
return vt_orderids
else:
return []
def cancel_order(self, vt_orderid: str) -> None:
"""
Cancel an existing order.
"""
if self.trading:
self.cta_engine.cancel_order(self, vt_orderid)
def cancel_all(self) -> None:
"""
Cancel all orders sent by strategy.
"""
if self.trading:
self.cta_engine.cancel_all(self)
def write_log(self, msg: str) -> None:
"""
Write a log message.
"""
self.cta_engine.write_log(msg, self)
def get_engine_type(self) -> EngineType:
"""
Return whether the cta_engine is backtesting or live trading.
"""
return self.cta_engine.get_engine_type()
def get_pricetick(self) -> float:
"""
Return pricetick data of trading contract.
"""
return self.cta_engine.get_pricetick(self)
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
) -> None:
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback: Callable = self.on_bar
bars: List[BarData] = self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
for bar in bars:
callback(bar)
def load_tick(self, days: int) -> None:
"""
Load historical tick data for initializing strategy.
"""
ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)
for tick in ticks:
self.on_tick(tick)
def put_event(self) -> None:
"""
Put an strategy data event for ui update.
"""
if self.inited:
self.cta_engine.put_strategy_event(self)
def send_email(self, msg) -> None:
"""
Send email to default receiver.
"""
if self.inited:
self.cta_engine.send_email(msg, self)
def sync_data(self) -> None:
"""
Sync strategy variables value into disk storage.
"""
if self.trading:
self.cta_engine.sync_strategy_data(self)
def get_trading_hours(self):
"""
Return trading_hours of trading hours. # hxxjava add
"""
return self.cta_engine.get_trading_hours(self)
def get_actual_days(self,last_time:datetime,days:int): # hxxjava add
"""
得到从last_time开始往前days天的实际天数。
"""
if days <= 0:
return 0
th = TradingHours(self.get_trading_hours())
# 找到有效的最后时间
till_time = last_time
while not th.get_trade_hours(till_time):
till_time = timedelta(minutes=1)
availble_days = 0
#找到有些多开始时间
from_time = till_time
while availble_days <= days:
from_time -= timedelta(days=1)
if th.get_trade_hours(from_time):
availble_days += 1
actual_days = (last_time-from_time).days
# print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
return actual_days
def get_contract(self):
"""
Return trading_hours of trading contract. # hxxjava add
"""
return self.cta_engine.get_contract(self)
@virtual
def on_condition_order(self, cond_order: ConditionOrder):
"""
Callback of condition order update.
"""
pass
def send_condition_order(self,order:ConditionOrder): # hxxjava add
""" """
if not self.trading:
return False
return self.cta_engine.send_condition_order(order)
def cancel_condition_order(self,cond_orderid:str): # hxxjava add
""" """
return self.cta_engine.cancel_condition_order(cond_orderid)
def cancel_all_condition_orders(self): # hxxjava add
""" """
return self.cta_engine.cancel_all_condition_orders(self)
def send_margin_ratio_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = MarginRequest(symbol=symbol,exchange=exchange)
main_engine.send_margin_ratio_request(req,contract.gateway_name)
def send_commission_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = CommissionRequest(symbol=symbol,exchange=exchange)
main_engine.send_commission_request(req,contract.gateway_name)
修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:
class BarGenerator:
... ...
def update_status(self,status:StatusData=None):
""" """
# if status:
# hh,mm = status.enter_time.split(':')
# st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)
if self.bar:
# 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
self.on_bar(self.bar)
self.bar = None
在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:
class XxxStrategy(CtaTemplate):
""" XXX交易策略 """
self.window = 30
... ...
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
... ...
self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
... ...
def on_inited(self):
"""
Callback after strategy is inited.
"""
self.bg.update_status() # 用来强制生成最后的1分钟K线
self.write_log("策略初始化结束。")
def on_auction_tick(self, tick: TickData):
"""
Callback of auction tick data update.
"""
self.bg.update_auction_tick(tick)
def on_status(self,status:StatusData=None): #
"""
收到合约交易状态信息时,更新所有的K线生成器 。
注意:合约交易状态信息推送只会在策略初始化后才执行!
"""
self.bg.update_status(status=status)
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_xmin_bar(self, bar: BarData):
""" """
# 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
pass
... ...
修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
在ChartWidget基础上创建的指标是不能改变主图或副图的参数的,如何做才能给让指标传递不同参数?
以MacdItem为例,步骤如下:
MacdItem的实现见:为K线图表添砖加瓦——解决MACD绘图部件显示范围BUG,这里就不再贴代码。
def add_item(
self,
item_class: Type[ChartItem],
item_name: str,
plot_name: str,
**kwargs # hxxjava add
) -> None:
"""
Add chart item.
"""
# item: ChartItem = item_class(self._manager)
item: ChartItem = item_class(self._manager) if not kwargs else item_class(self._manager,**kwargs) # hxxjava change
self._items[item_name] = item
plot: pg.PlotItem = self._plots.get(plot_name)
plot.addItem(item)
self._item_plot_map[item] = plot
class MyChartWidget(QtWidgets.QFrame):
"""
订单流K线图表控件,
"""
def __init__(self,title:str="K线图表"):
""""""
super().__init__()
self.init_ui()
self.setWindowTitle(title)
def init_ui(self):
""""""
self.resize(1400, 800)
# Create chart widget
self.chart = ChartWidget()
self.chart.add_plot("candle", hide_x_axis=True)
self.chart.add_plot("fast_macd", maximum_height=100,hide_x_axis=True)
self.chart.add_plot("slow_macd", maximum_height=100,hide_x_axis=True)
self.chart.add_plot("volume", maximum_height=100,hide_x_axis=False)
self.chart.add_item(CandleItem, "candle", "candle")
self.chart.add_item(MacdItem, "fast_macd", "fast_macd",short_window=6,long_window=19,M=9) # 相当于MACD(6,19,9)
self.chart.add_item(MacdItem, "slow_macd", "slow_macd",short_window=19,long_window=39,M=9) # 相当于MACD(19,39,9)
self.chart.add_item(VolumeItem, "volume", "volume")
self.chart.add_cursor()
最近经常有朋友问MacdItem绘图部件显示范围有问题,因为老是有任务在手上,一直没有时间修改,现修改过了,测试没有问题,代码如下:
class MacdItem(ChartItem):
""""""
def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.last_range:Tuple[int, int] = (-1,-1) # 最新显示K线索引范围
self.short_window = short_window
self.long_window = long_window
self.M = M
self.macd_data: Dict[int, List[float,float,float]] = {}
def get_macd_value(self, ix: int) -> List:
""""""
if ix < 0:
return (0.0,0.0,0.0)
# When initialize, calculate all macd value
if not self.macd_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = [diffs[n],deas[n],macds[n]]
# Return if already calcualted
if ix in self.macd_data:
return self.macd_data[ix]
# Else calculate new value
close_data = []
for n in range(ix-self.long_window-self.M+1, ix + 1):
bar = self._manager.get_bar(n)
close_data.append(bar.close_price)
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
diff,dea,macd = diffs[-1],deas[-1],macds[-1]
self.macd_data[ix] = [diff,dea,macd]
return [diff,dea,macd]
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
macd_value = self.get_macd_value(ix)
last_macd_value = self.get_macd_value(ix - 1)
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
"""
获得3个指标在y轴方向的范围
hxxjava 修改,2022-11-15
当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
"""
if not self.macd_data:
# 如果MACD三个数据没有计算过
return (-100,100)
last_range = (min_ix,max_ix)
if last_range == (None,None):
# 本次索引范围未改变
if self.last_range in self._values_ranges:
# 如果y方向范围已经保存,读取y方向范围
result = self._values_ranges[self.last_range]
return adjust_range(result)
else:
# 如果y方向范围没有保存,从macd_data重新计算y方向范围
min_ix,max_ix = 0,len(self.macd_data)-1
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
if result == (np.nan,np.nan):
# 前面的未计算出有效值会出现这种情况
return (-100,100)
self.last_range = (min_ix,max_ix)
self._values_ranges[self.last_range] = result
return adjust_range(result)
""" 以下为显示范围变化时 """
if last_range in self._values_ranges:
# 该范围已经保存过y方向范围
# 取得y方向范围,返回结果
self.last_range = last_range
result = self._values_ranges[last_range]
return adjust_range(result)
# 该范围没有保存过y方向范围,从macd_data重新计算y方向范围
min_ix = max(0,min_ix)
max_ix = min(max_ix,len(self.macd_data)-1)
macd_list = list(self.macd_data.values())[min_ix:max_ix+1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 取得y方向范围,返回结果
result = (min_price, max_price)
if result == (np.nan,np.nan):
# 前面的未计算出有效值会出现这种情况
return (-100,100)
# print(f"result1={result}")
self.last_range = (min_ix,max_ix)
self._values_ranges[self.last_range] = result
self.min_ix,self.max_ix = last_range
return adjust_range(result)
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
[diff,dea,macd] = self.macd_data[ix]
words = [
f"diff {diff:.3f}"," ",
f"dea {dea:.3f}"," ",
f"macd {macd:.3f}",
f"barscount={ix,barscount}"
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nmacd -"
return text
本人在对郑商所行情数据中时间秒以下的部分进行补充的修改一文中,给出来对郑商所行情数据中时间秒以下的部分进行了补充,ctp_gateway也已经按照预想的那样工作了。之后启动DataRecorder对多个合约进行tick数据的录制。可是在价差tick数据的时候却发现对有夜盘的合约进行录制的时间,如果在23:00以后还有tick数据推送的话,会出现日期错误,具体的表现是:2022-10-19 23:00:00以后的tick数据中的datetime字段的日期部分居然是2022-10-20!也就是说我昨天晚上就录到了今天晚上23:00:00以后tick数据(当然是未来时间),这是错误的!
打开mysql数据库(我用的是mysql数据库):
select symbol,datetime from dbtickdata where time(datetime) > "22:59:59.5";
+--------+-------------------------+
| symbol | datetime |
+--------+-------------------------+
| i2301 | 2022-10-19 22:59:59.522 |
| i2301 | 2022-10-19 23:00:00.013 |
| i2301 | 2022-10-19 23:00:00.038 |
| i2301 | 2022-10-20 23:00:00.038 | ---- 日期错误,应该是2022-10-19
| p2301 | 2022-10-19 23:00:00.025 |
| p2301 | 2022-10-20 23:00:00.025 | ---- 日期错误,应该是2022-10-19
| rb2301 | 2022-10-19 22:59:59.500 |
| rb2301 | 2022-10-19 23:00:00.000 |
| TA301 | 2022-10-19 22:59:59.500 |
| TA301 | 2022-10-19 22:59:59.750 |
| TA301 | 2022-10-19 22:59:59.875 |
+--------+-------------------------+
注意:我发现此问题的时间是2022-10-20 16:00!
经过仔细研读ctp_gateway.py的CtpMdApi的onRtnDepthMarketData(),具体的实现见下面的代码,发现对大商所合约tick的datetime错误导致的。
按照ctp接口规范文档知道,大商所合约的深度行情中没有日期字段,需要客户端收到后自行用本地日期去替代。
CtpMdApi中用定时器维护了一个self.current_date的本地日期成员,其主要作用就是做为了补齐大商所合约的深度行情日期和时间之用的。
因为CtpMdApi每次重新连接之后,再次订阅一些合约的行情的时候,必然会推送一条该合约在交易所中最后更新的tick时间给订阅方。只要在每天8:59(不含)之前重新连接大商所的行情服务器,就一定会收到一个日期为当日,时间为该合约最后一次推送的tick,而其中的补足日期是不对的!
请看看上面的证据中,发生日期错误的全部是大商所的合约,分析正确 !
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
方法1. 问题已经找到,怎么保证这个日期一定是正确的,可不是一件简单的事情,因为它牵涉到交易时间段和节假日的计算问题!
方法2. 有一个简单的办法,既然正确日期不好计算,可以对大商所的tick与当前本地时间进行比较,如果tick的datetime是未来时间,直接该tick抛弃掉,这样也不会有太大影响。
打开vnpy_ctpgateway.py文件,对class CtpMdApi进行如下修改,具体的修改请查找 '# hxxjava'就可以找到修改的语句了。
1、CtpMdApi中增加self.current_time,用来记录本地时间
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: Set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
# self.current_date: str = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time: datetime = datetime.now() # hxxjava adds
self.current_date: str = self.current_time.strftime("%Y%m%d") # hxxjava adds
self.czce_last_times:Dict[str,datetime] = {} # hxxjava add
2、修改CtpMdApi的update_date()函数
def update_date(self) -> None:
"""更新当前日期"""
# self.current_date = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time = datetime.now() # hxxjava adds
self.current_date = self.current_time.strftime("%Y%m%d") # hxxjava adds
3、修改CtpMdApi的深度行情推送函数onRtnDepthMarketData()
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
elif contract.exchange == Exchange.DCE:
# 对郑商所配置的秒以下的部分进行特别处理
if dt > CHINA_TZ.localize(self.current_time) + timedelta(seconds = 10):
# 如果大商所的补足本地日期的dt超前每2秒更新一次的本地时间2秒,判断为无数的行情数据,做丢弃处理。
# 但是为了降低对本地时间与交易所时间同步要求,放宽为10秒或者更多都可以
return
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
在使用vnpy中的DataRecorder进行tick情录制的时候,本人发现郑商所的tick中时间在一秒内会收到两个tick,但是同一秒内的tick的时间是相同的。这会给后端的CTA策略、价差交易策略登录模块在合成K的时带来不必要的困扰,因为通常合成K线的BarGenerator通常对tick的时间进行有效性判断的时候,会把时间重复的tick做抛弃处理,这会再次郑商所的品种的tick被使用的量减半。如果重复的tick也用的话,这样倒是不会减半,可是在防止垃圾数据时又无法杜绝旧的数据在网关断网或者重新再次连接时,接口回再次推送最后的tick数据,而这个tick数据之前已经被实时推送给客户段了。所有在ctp_gateway中对郑商所行情数据中时间秒以下的部分进行补充是非常必要的!
本修改只需要对vnpy_ctp\ctp_gateway.py进行修改就可以了,修改步骤如下:
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: Set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.now().strftime("%Y%m%d")
self.czce_last_times:Dict[str,datetime] = {} # 郑商所最新tick时间字典 hxxjava add
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
这是本人录制的郑商所合约TA301的结果:
都说国内期货的行情推送最多是2次,其实是错误的,可能超过2次 !看上图就知道,一秒3次一个经常存在的,所以不能够固定递增0.5秒,而应该向本文中的办法,每次加0.5,0.25,0.125......秒的比较好,既不重复有可以超过2次。
在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。
声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:
from .event import EVENT_ORIGIN_TICK,EVENT_STATUS # hxxjava add
from .object import StatusData # hxxjava add
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_ORIGIN_TICK, tick) # hxxjava add
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
修改vnpy_cpt\ctp_gateway.py:
from vnpy.trader.constant import InstrumentStatus,StatusEnterReason # hxxjava debug
rom vnpy.trader.object import StatusData, # hxxjava debug
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
from vnpy.trader.event import EVENT_AUCTION_TICK # hxxjava add
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 """
tick:TickData = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
__init__.py
中的CtaStrategyApp做如下修改class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
为class BaseDatabase增加下面两个接口函数:
@abstractmethod
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
pass
@abstractmethod
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
pass
class MyDateTimeField(DateTimeField):
def get_modifiers(self):
return [6]
class DbLastTick(Model): # hxxjava add
""" 最新TICK数据表映射对象 """
id = AutoField()
gateway_name: str = CharField()
symbol: str = CharField()
exchange: str = CharField()
datetime: datetime = MyDateTimeField()
name: str = CharField()
volume: float = FloatField()
turnover: float = FloatField()
open_interest: float = FloatField()
last_price: float = FloatField()
last_volume: float = FloatField()
limit_up: float = FloatField()
limit_down: float = FloatField()
open_price: float = FloatField()
high_price: float = FloatField()
low_price: float = FloatField()
pre_close: float = FloatField()
bid_price_1: float = FloatField()
bid_price_2: float = FloatField(null=True)
bid_price_3: float = FloatField(null=True)
bid_price_4: float = FloatField(null=True)
bid_price_5: float = FloatField(null=True)
ask_price_1: float = FloatField()
ask_price_2: float = FloatField(null=True)
ask_price_3: float = FloatField(null=True)
ask_price_4: float = FloatField(null=True)
ask_price_5: float = FloatField(null=True)
bid_volume_1: float = FloatField()
bid_volume_2: float = FloatField(null=True)
bid_volume_3: float = FloatField(null=True)
bid_volume_4: float = FloatField(null=True)
bid_volume_5: float = FloatField(null=True)
ask_volume_1: float = FloatField()
ask_volume_2: float = FloatField(null=True)
ask_volume_3: float = FloatField(null=True)
ask_volume_4: float = FloatField(null=True)
ask_volume_5: float = FloatField(null=True)
localtime: datetime = DateTimeField(null=True)
class Meta:
database = db
indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)
class MysqlDatabase的初始化做如下修改:
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview]) # hxxjava add DbLastTick,DbContractData
再为class MysqlDatabase添加下面两个函数:
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
vt_symbols = [t.vt_symbol for t in ticks]
# 删除ticks列表中包含合约的旧的tick记录
d: ModelDelete = DbLastTick.delete().where(
(DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
)
count = d.execute()
# print(f"delete {count} last ticks")
# 构造最新的ticks列表数据
data = []
for t in ticks:
tick:TickData = deepcopy(t) # hxxjava change
tick.datetime = tick.datetime
d = tick.__dict__
d["exchange"] = d["exchange"].value
d.pop("vt_symbol")
data.append(d)
# print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbLastTick.insert_many(c).on_conflict_replace().execute()
return True
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
try:
# 从DbLastTick查询符合条件的最新tick记录
s: ModelSelect = (
DbLastTick.select().where(
(DbLastTick.gateway_name == gateway_name)
& (exchange is None or DbLastTick.exchange == exchange.value)
& (symbol is None or DbLastTick.symbol == symbol)
).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
)
# 利用最新tick记录构造ticks列表
ticks: List[TickData] = []
for db_tick in s:
tick:TickData = TickData(
symbol=db_tick.symbol,
exchange=Exchange(db_tick.exchange),
datetime=to_china_tz(db_tick.datetime),
name=db_tick.name,
volume=db_tick.volume,
turnover=db_tick.turnover,
open_interest=db_tick.open_interest,
last_price=db_tick.last_price,
last_volume=db_tick.last_volume,
limit_up=db_tick.limit_up,
limit_down=db_tick.limit_down,
open_price=db_tick.open_price,
high_price=db_tick.high_price,
low_price=db_tick.low_price,
pre_close=db_tick.pre_close,
bid_price_1=db_tick.bid_price_1,
bid_price_2=db_tick.bid_price_2,
bid_price_3=db_tick.bid_price_3,
bid_price_4=db_tick.bid_price_4,
bid_price_5=db_tick.bid_price_5,
ask_price_1=db_tick.ask_price_1,
ask_price_2=db_tick.ask_price_2,
ask_price_3=db_tick.ask_price_3,
ask_price_4=db_tick.ask_price_4,
ask_price_5=db_tick.ask_price_5,
bid_volume_1=db_tick.bid_volume_1,
bid_volume_2=db_tick.bid_volume_2,
bid_volume_3=db_tick.bid_volume_3,
bid_volume_4=db_tick.bid_volume_4,
bid_volume_5=db_tick.bid_volume_5,
ask_volume_1=db_tick.ask_volume_1,
ask_volume_2=db_tick.ask_volume_2,
ask_volume_3=db_tick.ask_volume_3,
ask_volume_4=db_tick.ask_volume_4,
ask_volume_5=db_tick.ask_volume_5,
localtime=db_tick.localtime,
gateway_name=db_tick.gateway_name
)
ticks.append(tick)
return ticks
except:
# 当DbLastTick表不存在的时候,会发生错误
return []
在vnpy.usertools下创建tickfilter.py文件,其内容如下:
"""
本文件主要实现tick数据过滤器——TickFilter。
tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持
作者:hxxjava
日期:2022-06-16
修改日期: 修改原因:
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
EVENT_ORIGIN_TICK,
EVENT_AUCTION_TICK,
EVENT_TICK,
EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol
def left_alphas(instr:str):
"""
得到字符串左边的字符部分
"""
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
def get_vt_instrument(vt_symbol:str):
"""
从完整合约代码转换到完整品种代码
"""
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
return f"{instrument}.{exchange.value}"
class TickFilter():
""" tick数据过滤器 """
CHECK_INTERVAL:int = 5 # 更新到数据库间隔
def __init__(self,event_engine:EventEngine,gateway_name:str):
""" tick数据过滤器初始化 """
self.event_engine = event_engine
self.gateway_name = gateway_name
self.db = get_database()
# 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}
# 品种及合约状态字典 { vt_symbol : StatusData }
self.statuses:Dict[str,StatusData] = {}
self.second_cnt = 0
self.load_last_ticks()
self.register_event()
# print(f"TickFilter {gateway_name}")
def load_last_ticks(self):
"""
加载属于网关名称为self.gateway_name的最新tick列表
"""
last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
for tick in last_ticks:
self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)
# print(f"load {len(last_ticks)} last ticks")
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event)
self.event_engine.register(EVENT_TIMER,self.check_last_ticks)
def process_tick_event(self,event:Event):
""" 对原始tick进行过滤 """
tick:TickData = event.data
# 检查tick合约的经验状态是否位有效交易状态
status:StatusData = self.statuses.get(tick.vt_symbol,None)
if not status:
vt_instrument = get_vt_instrument(tick.vt_symbol)
status = self.statuses.get(vt_instrument,None)
if not status:
# 未收到交易状态,返回
return
if status.instrument_status not in VALID_TRADE_STATUSES:
# 不在有效交易状态,返回
return
key = (tick.gateway_name,tick.vt_symbol)
_,oldtick = self.last_ticks.get(key,(None,None))
valid_tick = False
if not oldtick:
# 没有该合约的历史tick
self.last_ticks[key] = (True,tick)
valid_tick = True
elif tick.datetime > oldtick.datetime:
#
self.last_ticks[key] = (True,tick)
valid_tick = True
else:
print(f"【特别tick = {tick}】")
if valid_tick == True:
# 如果是有效的tick
if status.instrument_status != InstrumentStatus.CONTINOUS:
# 发送集合竞价tic消息到系统中
self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
else:
# 发送连续竞价tic消息到系统中
self.event_engine.put(Event(EVENT_TICK,tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
def process_status_event(self, event: Event):
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.statuses[status.vt_symbol] = status
# print(f"【{status.gateway_name} {status}】")
def check_last_ticks(self,event:Event) -> None:
""" 原始tick过滤器 """
self.second_cnt += 1
if self.second_cnt % self.CHECK_INTERVAL == 0:
# 如果到了定时间隔
# 查询所有更新的tick
changed_ticks = []
for key,(update,tick) in self.last_ticks.items():
if update:
changed_ticks.append(tick)
self.last_ticks[key] = (False,tick)
if changed_ticks:
# 如果存在更新的tick,保存到数据库
t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
t.start()
# print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")
修改vnpy\trader\engine.py
from vnpy.usertools.tickfilter import TickFilter # hxxjava add
在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:
self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add
修改其add_gateway(),内容如下:
def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
"""
Add gateway.
"""
# Use default name if gateway_name not passed
if not gateway_name:
gateway_name = gateway_class.default_name
gateway = gateway_class(self.event_engine, gateway_name)
self.gateways[gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
# add a tick data filter for the gateway # hxxjava add
if gateway_name not in self.tick_filters:
self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)
return gateway
只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""" Constructor """
... ... # 其他代码省略
self.auction_tick:TickData = None
self.last_tick: TickData = None
def update_auction_tick(self,tick:TickData):
""" 更新集合竞价tick """
self.auction_tick = tick
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
if self.auction_tick:
# 合约集合竞价tick到当前tick
tick.high_price = max(tick.high_price,self.auction_tick.high_price)
tick.low_price = min(tick.low_price,self.auction_tick.low_price)
# 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
self.last_tick = deepcopy(self.auction_tick)
# 成交量和成交额每天从0开始单调递增
self.last_tick.volume = 0.0
self.last_tick.turnover = 0.0
# 用完集合竞价tick就丢弃
self.auction_tick = None
... ... # 其他代码省略
def on_auction_tick(self, tick: TickData):
"""
集合竞价tick处理
"""
self.bg.update_auction_tick(tick) # 假设self.bg是已经创建过的bar生成器
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
# a: 成交量累计
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
# b: 成交额累计
turnover_change = tick.turnover - self.last_tick.turnover
self.bar.turnover += max(turnover_change, 0)
# c: 记录最新tick
self.last_tick = tick
交易所播报的tick这成交量和成交额是从每个交易日的集合竞价阶段开始累计的,它们都有单调递增的特性。
也就是说volume和turnover在一个交易日内只会增加,不会减少。
答案是没有的!
![]() |
---|
图1 |
此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3<v2。
![]() |
---|
图2 |
此时,前一天的最后收到的tick都volume为v2,当前天的集合竞价或者一个收到的tick都volume为v3,v3>v2。
这种情况会发生吗?会的 !前一天的交易特别清淡,导致成交量非常小,而当前天因为收到什么特别因素刺激,开盘就非常火爆,光是集合竞价的成交量就比前一天的整体的成交量都大,就会出现此图所示的情况。
a处代码中的volume_change 是什么?它代表了从上一个tick到当前tick发生的成交量。
1分钟K线到成交量就是对这个volume_change的累计。但是有一个例外,当volume_change < 0时,累加的值就只能够为0了。
那么什么情况下会发生volume_change < 0呢?答案是发生在如图1所示的跨日的情况下!
可是还会发生如图2所示的跨日的情况,此时volume_change > 0,那么volume_change 就不应该是v3-v2,而应该是v3 !
所以a处成交量累计的代码是错误的,同理b处对成交额累计的代码也是错误的!
策略中使用到交易合约的合约信息是非常困难发生的事情,例如使用其中的合约乘数、最小交易交易单位、最小价格变动等 ... ... 。
如果您的策略中使用到交易合约的合约信息时,当您在休市的时候无法连接Gateway,那么你在回测的时候将无法进行下去。
怎么办,坐等交易所开市吗?本帖就讨论这样一个问题。
vnpy\trader\object.py中ContractData是这样定义的:
@dataclass
class ContractData(BaseData):
"""
Contract data contains basic information about each contract traded.
"""
symbol: str
exchange: Exchange
name: str
product: Product
size: float
pricetick: float
open_date:datetime = None # 上市日 open date hxxjava add
expire_date:datetime = None # 到期日 expire date hxxjava add
min_volume: float = 1 # minimum trading volume of the contract
stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None
option_listed: datetime = None
option_expiry: datetime = None
option_portfolio: str = ""
option_index: str = "" # for identifying options with same strike price
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
修改vnpy_ctp\gateway\ctp_gateway.py中的CtpTdApi接口onRspQryInstrument():
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""合约查询回报"""
product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product:
contract: ContractData = ContractData(
symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
name=data["InstrumentName"],
product=product,
size=data["VolumeMultiple"],
pricetick=data["PriceTick"],
open_date=datetime.strptime(data["OpenDate"], "%Y%m%d"), # hxxjava add
expire_date=datetime.strptime(data["ExpireDate"], "%Y%m%d"), # hxxjava add
gateway_name=self.gateway_name
)
# 期权相关
if contract.product == Product.OPTION:
# 移除郑商所期权产品名称带有的C/P后缀
if contract.exchange == Exchange.CZCE:
contract.option_portfolio = data["ProductID"][:-1]
else:
contract.option_portfolio = data["ProductID"]
contract.option_underlying = data["UnderlyingInstrID"]
contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
contract.option_strike = data["StrikePrice"]
contract.option_index = str(data["StrikePrice"])
contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
self.gateway.on_contract(contract)
symbol_contract_map[contract.symbol] = contract
if last:
self.contract_inited = True
self.gateway.write_log("合约信息查询成功")
for data in self.order_data:
self.onRtnOrder(data)
self.order_data.clear()
for data in self.trade_data:
self.onRtnTrade(data)
self.trade_data.clear()
在vnpy\trader\database.py中为BaseDatabase增加两个与合约信息相关的接口:
@abstractmethod
def save_constract_data(self, constracts: List[ContractData]) -> bool:
"""
Save constract data into database. hxxjava add
"""
pass
@abstractmethod
def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
"""
Load constract data from database. hxxjava add
"""
pass
本人使用的是MySql Server,所以就在vnpy_mysql\mysql_database.py中扩展这两个接口:
from vnpy.trader.constant import Exchange, Interval, Product # hxxjava add Product
class DbContractData(Model): # hxxjava add
"""K线数据表映射对象"""
id = AutoField()
symbol: str = CharField()
exchange: str = CharField()
name : str = CharField()
product : str = CharField()
size : float = FloatField()
pricetick : float = FloatField()
open_date: datetime = DateTimeField(default=None)
expire_date: datetime = DateTimeField(default=None)
min_volume : float = FloatField(default=1)
stop_supported : bool = BooleanField(default=False)
net_position : bool = BooleanField(default=False)
history_data : bool = BooleanField(default=False)
option_strike : float = FloatField(default=0)
option_underlying: str = CharField(default="")
option_type: str = CharField(default="")
option_listed: datetime = DateTimeField(default=None)
option_expiry: datetime = DateTimeField(default=None)
option_portfolio: str = CharField(default="")
option_index: str = CharField(default="")
gateway_name:str = CharField()
class Meta:
database = db
indexes = ((("open_date","exchange","symbol"), True),)
class MysqlDatabase(BaseDatabase):
"""Mysql数据库接口"""
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbBarOverview]) # hxxjava add DbContractData
def save_constract_data(self, contracts: List[ContractData]) -> bool:
"""
Save constract data into database. hxxjava add
"""
# 将constracts数据转换为字典
data = []
for contract in contracts:
copy_c = deepcopy(contract)
d = copy_c.__dict__
d["exchange"] = d["exchange"].value
d["product"] = d["product"].value
d.pop("vt_symbol")
data.append(d)
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbContractData.insert_many(c).on_conflict_replace().execute()
return True
def load_contract_data(self,vt_symbol:str="") -> List[ContractData]:
"""
Load constract data from database. hxxjava add
"""
symbol,exchange = "",""
if vt_symbol:
symbol,exchange = vt_symbol.split('.')
s: ModelSelect = DbContractData.select().where(
(not symbol or DbContractData.symbol == symbol)
& (not symbol or DbContractData.exchange == exchange)
).order_by(DbContractData.open_date,DbContractData.exchange,DbContractData.symbol)
contracts: List[ContractData] = []
for db_c in s:
# 取出四个时间
open_date = datetime.fromtimestamp(db_c.open_date.timestamp(), DB_TZ)
expire_date = datetime.fromtimestamp(db_c.expire_date.timestamp(), DB_TZ)
option_listed = None
option_expiry = None
product = Product(db_c.product)
if product == Product.OPTION:
option_listed = datetime.fromtimestamp(db_c.option_listed.timestamp(), DB_TZ)
option_expiry = datetime.fromtimestamp(db_c.option_expiry.timestamp(), DB_TZ)
contract = ContractData(
symbol = db_c.symbol,
exchange = Exchange(db_c.exchange),
name = db_c.name,
product = Product(db_c.product),
size = db_c.size,
pricetick = db_c.pricetick,
open_date = open_date,
expire_date = expire_date,
min_volume = db_c.min_volume,
stop_supported = db_c.stop_supported,
net_position = db_c.net_position,
history_data = db_c.history_data,
option_strike = db_c.option_strike,
option_underlying = db_c.option_underlying,
option_type = db_c.option_type,
option_listed = option_listed,
option_expiry = option_expiry,
option_portfolio = db_c.option_portfolio,
option_index = db_c.option_index,
gateway_name = db_c.gateway_name,
)
contracts.append(contract)
return contracts
修改vnpy\trader\engine.py做如下修改:
from threading import Thread # hxxjava add
from copy import deepcopy # hxxjava add
from .database import get_database # hxxjava add
class OmsEngine(BaseEngine):
"""
Provides order management system function.
"""
contract_file = "contracts.json" # hxxjava add
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.quotes: Dict[str, QuoteData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.active_quotes: Dict[str, QuoteData] = {}
self.db = get_database()
self.load_contracts() # hxxjava add 启动时就从数据库读取所有合约信息
self.add_function()
self.register_event()
def process_account_event(self, event: Event) -> None:
""""""
account: AccountData = event.data
self.accounts[account.vt_accountid] = account
self.save_contracts_changed() # hxxjava add
def process_contract_event(self, event: Event) -> None:
""""""
contract: ContractData = event.data
# self.contracts[contract.vt_symbol] = contract
# hxxjava change
if contract.vt_symbol not in self.contracts:
self.contracts[contract.vt_symbol] = contract
self.contracts_changed.append(contract)
def save_contracts_changed(self):
""" 启动线程保存所有新增的合约 """
# 复制所有新增的合约
contracts = [deepcopy(contract) for contract in self.contracts_changed]
self.contracts_changed = []
if contracts:
# 如果有新增的合约,启动线程保存
t = Thread(target=self.save_contracts,kwargs=({"contracts":contracts}))
t.start()
def save_contracts(self,contracts:List): # hxxjava add
""" save contracts into database """
self.db.save_constract_data(contracts)
print(f"一共保存了{len(contracts)}个合约 !")
def load_contracts(self): # hxxjava add
""" save all contracts into a json file """
contracts = self.db.load_contract_data()
for c in contracts:
self.contracts[c.vt_symbol] = c
self.contracts_changed:List[ContractData] = []
print(f"一共读取了{len(contracts)}个合约 !")
# print(f"self.contracts={self.contracts}个合约 !")
修改vnpy_ctastrategy\engine.py中的CtaEngine,添加get_contract():
def get_contract(self, strategy: CtaTemplate) -> Optional[ContractData]: # hxxjava add
"""
Get strategy's contract data.
"""
return self.main_engine.get_contract(strategy.vt_symbol)
修改vnpy_ctastrategy\template.py中的CtaTemplate,添加get_contract():
def get_contract(self):
"""
Return trading_hours of trading contract. # hxxjava add
"""
return self.cta_engine.get_contract(self)
经过什么这么复杂的步骤,就赋予了您的CTA策略一种直接读取交易合约的合约信息的能力。
如何使用呢?非常简单,这里给出一段用户策略的例子代码:
""" 得到交易合约的合约信息 """
contract:ContractData = self.get_contract()
size_n = contract.size # 合约乘数
price_tick = contract.price_tick # 最小价格变动
所有可自由交易标的行情变动都是随机过程的累积,本质上就是马尔可夫过程,注意:一定要是可自由交易,不信请往下看。
import numpy as np
from matplotlib import pyplot as plt
N = 10*600
List1 = list(np.random.randn(N))
data3m = []
data30m = []
tt3m = 0.0
for i in range(N):
tt3m += List1[i]
if i % 10==0:
data30m.append(tt3m)
data3m.append(tt3m)
plt.figure(figsize=(20,6))
plt.plot(data3m)
plt.show()
plt.figure(figsize=(20,6))
plt.plot(data30m)
它所形成的曲线是那么自然,优美,和真实的行情只差一个合约名称 ... ...
当你想以比市场价更高的价格买,或者以比市场价更低的价格卖时,使用send_order()是会立即执行的,但是用停止单却可以做到这一点,这是停止单的优点。
但是实际使用中停止单也是有缺点的:
这是本人给它取的名字,它其实是本人以前提到的交易线(TradeLine)的改进和增强。
它主要就是为解决停止单上述缺点而设计的,当然应该具备上述优点。
在vnpy_ctastrategy\base.py中增加如下代码:
class Condition(Enum): # hxxjava add
""" 条件单的条件 """
BT = ">"
LT = "<"
BE = ">="
LE = "<="
class ExecutePrice(Enum): # hxxjava add
""" 执行价格 """
SETPRICE = "设定价"
MARKET = "市场价"
EXTREME = "极限价"
class CondOrderStatus(Enum): # hxxjava add
""" 条件单状态 """
WAITING = "等待中"
CANCELLED = "已撤销"
TRIGGERED = "已触发"
@dataclass
class ConditionOrder: # hxxjava add
""" 条件单 """
strategy_name: str
vt_symbol: str
direction: Direction
offset: Offset
price: float
volume: float
condition:Condition
execute_price:ExecutePrice = ExecutePrice.SETPRICE
create_time: datetime = datetime.now()
trigger_time: datetime = None
cond_orderid: str = "" # 条件单编号
status: CondOrderStatus = CondOrderStatus.WAITING
def __post_init__(self):
""" """
if not self.cond_orderid:
self.cond_orderid = datetime.now().strftime("%m%d%H%M%S%f")[:13]
EVENT_CONDITION_ORDER = "eConditionOrder" # hxxjava add
修改vnpy_ctastrategy\ui\widget.py中的class CtaManager,代码如下:
class CtaManager(QtWidgets.QWidget):
""""""
signal_log: QtCore.Signal = QtCore.Signal(Event)
signal_strategy: QtCore.Signal = QtCore.Signal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
""""""
super().__init__()
self.main_engine: MainEngine = main_engine
self.event_engine: EventEngine = event_engine
self.cta_engine: CtaEngine = main_engine.get_engine(APP_NAME)
self.managers: Dict[str, StrategyManager] = {}
self.init_ui()
self.register_event()
self.cta_engine.init_engine()
self.update_class_combo()
def init_ui(self) -> None:
""""""
self.setWindowTitle("CTA策略")
# Create widgets
self.class_combo: QtWidgets.QComboBox = QtWidgets.QComboBox()
add_button: QtWidgets.QPushButton = QtWidgets.QPushButton("添加策略")
add_button.clicked.connect(self.add_strategy)
init_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部初始化")
init_button.clicked.connect(self.cta_engine.init_all_strategies)
start_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部启动")
start_button.clicked.connect(self.cta_engine.start_all_strategies)
stop_button: QtWidgets.QPushButton = QtWidgets.QPushButton("全部停止")
stop_button.clicked.connect(self.cta_engine.stop_all_strategies)
clear_button: QtWidgets.QPushButton = QtWidgets.QPushButton("清空日志")
clear_button.clicked.connect(self.clear_log)
roll_button: QtWidgets.QPushButton = QtWidgets.QPushButton("移仓助手")
roll_button.clicked.connect(self.roll)
self.scroll_layout: QtWidgets.QVBoxLayout = QtWidgets.QVBoxLayout()
self.scroll_layout.addStretch()
scroll_widget: QtWidgets.QWidget = QtWidgets.QWidget()
scroll_widget.setLayout(self.scroll_layout)
self.scroll_area: QtWidgets.QScrollArea = QtWidgets.QScrollArea()
self.scroll_area.setWidgetResizable(True)
self.scroll_area.setWidget(scroll_widget)
self.log_monitor: LogMonitor = LogMonitor(self.main_engine, self.event_engine)
self.stop_order_monitor: StopOrderMonitor = StopOrderMonitor(
self.main_engine, self.event_engine
)
self.strategy_combo = QtWidgets.QComboBox()
self.strategy_combo.setMinimumWidth(200)
find_button = QtWidgets.QPushButton("查找")
find_button.clicked.connect(self.find_strategy)
# hxxjava add
self.condition_order_monitor = ConditionOrderMonitor(self.cta_engine)
# Set layout
hbox1: QtWidgets.QHBoxLayout = QtWidgets.QHBoxLayout()
hbox1.addWidget(self.class_combo)
hbox1.addWidget(add_button)
hbox1.addStretch()
hbox1.addWidget(self.strategy_combo)
hbox1.addWidget(find_button)
hbox1.addStretch()
hbox1.addWidget(init_button)
hbox1.addWidget(start_button)
hbox1.addWidget(stop_button)
hbox1.addWidget(clear_button)
hbox1.addWidget(roll_button)
grid = QtWidgets.QGridLayout()
# grid.addWidget(self.scroll_area, 0, 0, 2, 1)
grid.addWidget(self.scroll_area, 0, 0, 3, 1) # hxxjava change 3 rows , 1 column
grid.addWidget(self.stop_order_monitor, 0, 1)
grid.addWidget(self.condition_order_monitor, 1, 1) # hxxjava add
# grid.addWidget(self.log_monitor, 1, 1)
grid.addWidget(self.log_monitor, 2, 1) # hxxjava change
vbox: QtWidgets.QVBoxLayout = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox1)
vbox.addLayout(grid)
self.setLayout(vbox)
def update_class_combo(self) -> None:
""""""
names = self.cta_engine.get_all_strategy_class_names()
names.sort()
self.class_combo.addItems(names)
def update_strategy_combo(self) -> None:
""""""
names = list(self.managers.keys())
names.sort()
self.strategy_combo.clear()
self.strategy_combo.addItems(names)
def register_event(self) -> None:
""""""
self.signal_strategy.connect(self.process_strategy_event)
self.event_engine.register(
EVENT_CTA_STRATEGY, self.signal_strategy.emit
)
def process_strategy_event(self, event) -> None:
"""
Update strategy status onto its monitor.
"""
data = event.data
strategy_name: str = data["strategy_name"]
if strategy_name in self.managers:
manager: StrategyManager = self.managers[strategy_name]
manager.update_data(data)
else:
manager: StrategyManager = StrategyManager(self, self.cta_engine, data)
self.scroll_layout.insertWidget(0, manager)
self.managers[strategy_name] = manager
self.update_strategy_combo()
def remove_strategy(self, strategy_name) -> None:
""""""
manager: StrategyManager = self.managers.pop(strategy_name)
manager.deleteLater()
self.update_strategy_combo()
def add_strategy(self) -> None:
""""""
class_name: str = str(self.class_combo.currentText())
if not class_name:
return
parameters: dict = self.cta_engine.get_strategy_class_parameters(class_name)
editor: SettingEditor = SettingEditor(parameters, class_name=class_name)
n: int = editor.exec_()
if n == editor.Accepted:
setting: dict = editor.get_setting()
vt_symbol: str = setting.pop("vt_symbol")
strategy_name: str = setting.pop("strategy_name")
self.cta_engine.add_strategy(
class_name, strategy_name, vt_symbol, setting
)
def find_strategy(self) -> None:
""""""
strategy_name = self.strategy_combo.currentText()
manager = self.managers[strategy_name]
self.scroll_area.ensureWidgetVisible(manager)
def clear_log(self) -> None:
""""""
self.log_monitor.setRowCount(0)
def show(self) -> None:
""""""
self.showMaximized()
def roll(self) -> None:
""""""
dialog: RolloverTool = RolloverTool(self)
dialog.exec_()
在vnpy_ctastrategy\ui\widget.py中增加如下代码:
class ConditionOrderMonitor(BaseMonitor): # hxxjava add
"""
Monitor for condition order.
"""
event_type = EVENT_CONDITION_ORDER
data_key = "cond_orderid"
sorting = True
headers = {
"cond_orderid": {
"display": "条件单号",
"cell": BaseCell,
"update": False,
},
"vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": EnumCell, "update": False},
"offset": {"display": "开平", "cell": EnumCell, "update": False},
"price": {"display": "触发价", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"condition": {"display": "触发条件", "cell": EnumCell, "update": False},
"execute_price": {"display": "执行价", "cell": EnumCell, "update": False},
"create_time": {"display": "生成时间", "cell": TimeCell, "update": False},
"trigger_time": {"display": "触发时间", "cell": TimeCell, "update": False},
"status": {"display": "状态", "cell": EnumCell, "update": True},
"strategy_name": {"display": "策略名称", "cell": BaseCell, "update": False},
}
def __init__(self,cta_engine : MyCtaEngine):
""""""
super().__init__(cta_engine.main_engine, cta_engine.event_engine)
self.cta_engine = cta_engine
def init_ui(self):
"""
Connect signal.
"""
super().init_ui()
self.setToolTip("双击单元格可停止条件单")
self.itemDoubleClicked.connect(self.stop_condition_order)
def stop_condition_order(self, cell):
"""
Stop algo if cell double clicked.
"""
order = cell.get_data()
if order:
self.cta_engine.cancel_condition_order(order.cond_orderid)
修改策略管理器StrategyManager的代码如下:
class StrategyManager(QtWidgets.QFrame):
"""
Manager for a strategy
"""
def __init__(
self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict
):
""""""
super(StrategyManager, self).__init__()
self.cta_manager = cta_manager
self.cta_engine = cta_engine
self.strategy_name = data["strategy_name"]
self._data = data
self.tradetool : TradingWidget = None # hxxjava add
self.init_ui()
def init_ui(self):
""""""
self.setFixedHeight(300)
self.setFrameShape(self.Box)
self.setLineWidth(1)
self.init_button = QtWidgets.QPushButton("初始化")
self.init_button.clicked.connect(self.init_strategy)
self.start_button = QtWidgets.QPushButton("启动")
self.start_button.clicked.connect(self.start_strategy)
self.start_button.setEnabled(False)
self.stop_button = QtWidgets.QPushButton("停止")
self.stop_button.clicked.connect(self.stop_strategy)
self.stop_button.setEnabled(False)
self.trade_button = QtWidgets.QPushButton("交易") # hxxjava add
self.trade_button.clicked.connect(self.show_tradetool) # hxxjava add
self.trade_button.setEnabled(False) # hxxjava add
self.edit_button = QtWidgets.QPushButton("编辑")
self.edit_button.clicked.connect(self.edit_strategy)
self.remove_button = QtWidgets.QPushButton("移除")
self.remove_button.clicked.connect(self.remove_strategy)
strategy_name = self._data["strategy_name"]
vt_symbol = self._data["vt_symbol"]
class_name = self._data["class_name"]
author = self._data["author"]
label_text = (
f"{strategy_name} - {vt_symbol} ({class_name} by {author})"
)
label = QtWidgets.QLabel(label_text)
label.setAlignment(QtCore.Qt.AlignCenter)
self.parameters_monitor = DataMonitor(self._data["parameters"])
self.variables_monitor = DataMonitor(self._data["variables"])
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(self.init_button)
hbox.addWidget(self.start_button)
hbox.addWidget(self.trade_button) # hxxjava add
hbox.addWidget(self.stop_button)
hbox.addWidget(self.edit_button)
hbox.addWidget(self.remove_button)
# hxxjava change to self.vbox,old is vbox
self.vbox = QtWidgets.QVBoxLayout()
self.vbox.addWidget(label)
self.vbox.addLayout(hbox)
self.vbox.addWidget(self.parameters_monitor)
self.vbox.addWidget(self.variables_monitor)
self.setLayout(self.vbox)
def update_data(self, data: dict):
""""""
self._data = data
self.parameters_monitor.update_data(data["parameters"])
self.variables_monitor.update_data(data["variables"])
# Update button status
variables = data["variables"]
inited = variables["inited"]
trading = variables["trading"]
if not inited:
return
self.init_button.setEnabled(False)
if trading:
self.start_button.setEnabled(False)
self.trade_button.setEnabled(True) # hxxjava
self.stop_button.setEnabled(True)
self.edit_button.setEnabled(False)
self.remove_button.setEnabled(False)
else:
self.start_button.setEnabled(True)
self.trade_button.setEnabled(False) # hxxjava
self.stop_button.setEnabled(False)
self.edit_button.setEnabled(True)
self.remove_button.setEnabled(True)
def init_strategy(self):
""""""
self.cta_engine.init_strategy(self.strategy_name)
def start_strategy(self):
""""""
self.cta_engine.start_strategy(self.strategy_name)
def show_tradetool(self): # hxxjava add
""" 为策略显示交易工具 """
if not self.tradetool:
strategy = self.cta_engine.strategies.get(self.strategy_name,None)
if strategy and strategy.trading:
self.tradetool = TradingWidget(strategy,self.cta_engine.event_engine)
self.vbox.addWidget(self.tradetool)
else:
is_visible = self.tradetool.isVisible()
self.tradetool.setVisible(not is_visible)
def stop_strategy(self):
""""""
self.cta_engine.stop_strategy(self.strategy_name)
def edit_strategy(self):
""""""
strategy_name = self._data["strategy_name"]
parameters = self.cta_engine.get_strategy_parameters(strategy_name)
editor = SettingEditor(parameters, strategy_name=strategy_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
self.cta_engine.edit_strategy(strategy_name, setting)
def remove_strategy(self):
""""""
result = self.cta_engine.remove_strategy(self.strategy_name)
# Only remove strategy gui manager if it has been removed from engine
if result:
self.cta_manager.remove_strategy(self.strategy_name)
创建vnpy\usertools\trading_widget.py文件,其中内容:
"""
条件单交易组件
作者:hxxjava
日线:2022-5-10
"""
from vnpy.trader.ui import QtCore, QtWidgets, QtGui
from vnpy.trader.constant import Direction,Offset
from vnpy.trader.event import EVENT_TICK
from vnpy.event.engine import Event,EventEngine
from vnpy_ctastrategy.base import Condition,CondOrderStatus,ExecutePrice,ConditionOrder
from vnpy_ctastrategy.template import CtaTemplate
class TradingWidget(QtWidgets.QWidget):
"""
CTA strategy manual trading widget.
"""
signal_tick = QtCore.pyqtSignal(Event)
def __init__(self, strategy: CtaTemplate, event_engine: EventEngine):
""""""
super().__init__()
self.strategy: CtaTemplate = strategy
self.event_engine: EventEngine = event_engine
self.vt_symbol: str = strategy.vt_symbol
self.price_digits: int = 0
self.init_ui()
self.register_event()
def init_ui(self) -> None:
""""""
# 交易方向:多/空
self.direction_combo = QtWidgets.QComboBox()
self.direction_combo.addItems(
[Direction.LONG.value, Direction.SHORT.value])
# 开平选择:开/平
self.offset_combo = QtWidgets.QComboBox()
self.offset_combo.addItems([offset.value for offset in Offset])
# 条件类型
conditions = [Condition.BE,Condition.LE,Condition.BT,Condition.LT]
self.condition_combo = QtWidgets.QComboBox()
self.condition_combo.addItems(
[condition.value for condition in conditions])
double_validator = QtGui.QDoubleValidator()
double_validator.setBottom(0)
self.price_line = QtWidgets.QLineEdit()
self.price_line.setValidator(double_validator)
self.exit_line = QtWidgets.QLineEdit()
self.exit_line.setValidator(double_validator)
self.volume_line = QtWidgets.QLineEdit()
self.volume_line.setValidator(double_validator)
self.price_check = QtWidgets.QCheckBox()
self.price_check.setToolTip("设置价格随行情更新")
execute_prices = [ExecutePrice.SETPRICE,ExecutePrice.MARKET,ExecutePrice.EXTREME]
self.execute_price_combo = QtWidgets.QComboBox()
self.execute_price_combo.addItems(
[execute_price.value for execute_price in execute_prices])
send_button = QtWidgets.QPushButton("发出")
send_button.clicked.connect(self.send_condition_order)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(QtWidgets.QLabel(f"合约:{self.vt_symbol}"))
hbox.addWidget(QtWidgets.QLabel("方向"))
hbox.addWidget(self.direction_combo)
hbox.addWidget(QtWidgets.QLabel("开平"))
hbox.addWidget(self.offset_combo)
hbox.addWidget(QtWidgets.QLabel("条件"))
hbox.addWidget(self.condition_combo)
hbox.addWidget(QtWidgets.QLabel("触发价"))
hbox.addWidget(self.price_line)
hbox.addWidget(self.price_check)
hbox.addWidget(QtWidgets.QLabel("数量"))
hbox.addWidget(self.volume_line)
hbox.addWidget(QtWidgets.QLabel("执行价"))
hbox.addWidget(self.execute_price_combo)
hbox.addWidget(send_button)
# Overall layout
self.setLayout(hbox)
def register_event(self) -> None:
""""""
self.signal_tick.connect(self.process_tick_event)
self.event_engine.register(EVENT_TICK, self.signal_tick.emit)
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
if tick.vt_symbol != self.vt_symbol:
return
if self.price_check.isChecked():
self.price_line.setText(f"{tick.last_price}")
def send_condition_order(self) -> bool:
"""
Send new order manually.
"""
try:
direction = Direction(self.direction_combo.currentText())
offset = Offset(self.offset_combo.currentText())
condition = Condition(self.condition_combo.currentText())
price = float(self.price_line.text())
volume = float(self.volume_line.text())
execute_price = ExecutePrice(self.execute_price_combo.currentText())
order = ConditionOrder(
strategy_name = self.strategy.strategy_name,
vt_symbol=self.vt_symbol,
direction=direction,
offset=offset,
price=price,
volume=volume,
condition=condition,
execute_price=execute_price
)
self.strategy.send_condition_order(order=order)
print(f"发出条件单 : vt_symbol={self.vt_symbol},success ! {order}")
return True
except:
print(f"发出条件单 : vt_symbol={self.vt_symbol},input error !")
return False
在vnpy_ctastrategy\engine.py中对CtaEngine进行如下扩展:
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: dict
def load_active_condtion_orders(self):
""" """
return {}
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.call_strategy_func(strategy,strategy.on_condition_order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
对vnpy_ctastrategy__init__.py中的CtaTemplate进行如下修改:
from .engine import MyCtaEngine # hxxjava add
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
对vnpy_ctastrategy\template.py中的CtaTemplate进行如下扩展:
@virtual
def on_condition_order(self, cond_order: ConditionOrder):
"""
Callback of condition order update.
"""
pass
def send_condition_order(self,order:ConditionOrder): # hxxjava add
""" """
if not self.trading:
return False
return self.cta_engine.send_condition_order(order)
def cancel_condition_order(self,cond_orderid:str): # hxxjava add
""" """
return self.cta_engine.cancel_condition_order(cond_orderid)
def cancel_all_condition_orders(self): # hxxjava add
""" """
return self.cta_engine.cancel_all_condition_orders(self)
1)CTA策略中的条件单被触发点回调通知:
def on_condition_order(self, cond_order: ConditionOrder):
"""
Callback of condition order update.
"""
print(f"条件单已经执行,cond_order = {cond_order}")
2)发起条件单
cond_order = ConditionOrder(... ...)
self.send_condition_order(cond_order)
3)取消条件单
self.cancel_condition_order(cond_orderid)
4)取消策略的所有条件单
self.cancel_all_condition_orders()
如果您在启动vntrader的时候勾选了【ChartWizard 实时K线图表模块】,您会简单主界面上vnpy系统提供的K线图表功能图标,进入该功能模块后就可以输入本地代码,新建K线图表了。
使用了该功能之后,你会发现它有如下缺点:
这样一个太简单的K线图表是远远满足了交易者对K线图表的需求的,有多少人使用就可想而知了。
绝大多数交易策略都是基于K线来实现的。可是很少部分是只在1分钟K线的基础上运行的,可能是n分钟,n小时,n天...,只能提供一分钟的K线图是不够用的。
所以应该提供用户如下的选择:
用户之所以想看K线图,可能是想看看自己策略的算法是否正确,这一般都是使用了一个或者多个运行在窗口K线上指标计算的值计算的入场和出场信号。
这也是可以显示的,而这种指标不可能全部是系统自带的指标显示控件能够涵盖的,所以应该有方法让用户自己增加自己的指标显示部件。
所以应该提供下面功能: