xiaoshouchuan wrote:
有个问题,对于这种用到昨日日k数据的策略,主力合约更换的话怎么处理?
可以看看 K线的种类总结,或许你就可以找到答案。
好像不可以,因为:
@dataclass
class BarData(BaseData):
"""
Candlestick bar data of a certain trading period.
"""
symbol: str
exchange: Exchange
datetime: datetime
interval: Interval = None
volume: float = 0
open_interest: float = 0
open_price: float = 0
high_price: float = 0
low_price: float = 0
close_price: float = 0
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
表示一个BarData类型的时间间隔的单位是interval,它了类型是Interval,
class Interval(Enum):
"""
Interval of bar data.
"""
MINUTE = "1m"
HOUR = "1h"
DAILY = "d"
WEEKLY = "w"
这其中没有单位为秒的定义
class Interval(Enum):
"""
Interval of bar data.
"""
SECOND = "1s" # !!!
MINUTE = "1m"
HOUR = "1h"
DAILY = "d"
WEEKLY = "w"
之后可能改动的就多了去了,需要改动BarGenerator,CtaTemplate等等的,没有细想了.......,你自己往下想吧
arnego wrote:
为什么合成比如5分钟,有的时候是在5分钟最后,有的时候是在下一分钟开始?
有的时候收盘最后扔了单子,然后被拒了,然后有时候开盘会补回,有时候补回?
我的理解是有时候开盘那一瞬间才完成上一根bar?
因为:
1)tick推动1分钟合成bar的生成,然后1分钟合成bar推到5分钟合成bar的生成。
2)这导致你在5分钟合成bar的on_5min_bar()【加入你策略中就叫着个名字】中做的交易不是由时间来触发的,而是取决于新的5分钟合成K线的第一个tick数据是何时出现的。
3)如果新的5分钟合成bar的第一个tick数据来的早,你的策略就会结束旧的5分钟合成bar,然后使用它作为参数早执行交易,如果来的晚就晚执行。道理就是如此。
马俊 wrote:
我有米筐账户,会不会是和类似的策略冲突,明天我再试试
如果你加了print(f"len of bars={len(bars)}")的话,通常应该是>500的数,因为米筐的历史1分钟bar至少给出1日以上的数据,我本意是取得刚刚过去的1根1分钟K线数据,可是接口办不到,这会浪费一些时间,可是这也是没有办法的事情。米筐接口在这一点上做的不太好,不够精细,这也是他们的缺点。
马俊 wrote:
class BarGenerator代码就是复制楼主的,报的是 self.bar=bars[-1]list index out of range , 应该就是 bars = rqdata_client.query_history(req)没读到数据吧?
回复马俊:
1)如果你有米筐账户的话,应该不会出错。
2)我的策略正在使用这个class BarGenerator,没有出错 。
3)你在 self.bar=bars[-1]之前加入这样一句print(f"len of bars={len(bars)}") ,打印出来看看是否没有读取到米筐的历史1分钟bar,然后看看,然后再判断是什么问题。
4)看来应该考虑没有米筐账户的情况,其实没有没有米筐账户的话,self.load_bar(10)是读取不到10日历史bar的,那只能利用tick来慢慢合成1分钟bar了,此时我的代码也需要考虑这种情况了。
解决的办法就是:
bars = rqdata_client.query_history(req)
if bars:
self.bar = bars[-1]
print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
你可以自己在vnpy中本地会持仓进行逐日盯市计算。方法是保存每日的持仓历史,根据合约信息,计算每日持仓的进行保证金及当日盈亏结算。
可是计算结果和你开户的柜台逐日盯市计算进行比较,你会发现总是不一样,虽然差别不太大! 原因在哪里呢?经过排除,原来问题出于使用了合约的当日收盘价上,而正确的应该是使用期货结算价来计算当前持仓的保证金及当日盈亏。
结算价是当天交易结束后,对未平仓合约进行当日交易保证金及当日盈亏结算的基准价。
我国郑州商品交易所、大连商品交易所和上海期货交易所规定:当日结算价取某一期货合约当日成交价格按照成交量的加权平均价;当日无成交价格的,以上一交易日的结算价作为当日结算价。
中国金融期货交易所规定:当日结算价是指某一期货合约最后一小时成交价格按照成交量的加权平均价。 交收日按最后两小时的算术平均价计结算价。
期货收盘价是一天交易的最后一个价格,它是由收盘前1分钟所有买卖盘集中撮合而成。
1 SimNow模拟行情接口的对行情数据的推送不做延迟
2 但是SimNow模拟行情接口可能因为登录用户数量庞大,在下发这些从交易所得到的行情数据时,可能出现累积延迟导致收市后行情数据仍然没有发送结束的现象是可能存在的。
但是出现高达7-10分钟左右的累积延迟,还是让人感到震惊!
不过好在这是模拟账户,不是正式生产账户。
下周一用同样的方法测试下正式生产账户,应该没有这个问题。
xiaohe wrote:
可能的原因是:
- 在2.1.3加上全球时间戳之后,中国地区vnpy的时间戳应该是 +08:06。你的时间如果用replace(tzinfo=None)处理过后应该会少6分钟差别;
- 米筐的数据是9:01到下午3:00的,而vnpy是9:00到下午2:59的。可参考https://www.vnpy.com/forum/topic/1706-qing-wen-hui-ce-shi-shi-yong-9:01dao-3:00-huan-shi-cong-9:00dao-xia-wu-2:59de-shu-ju
应该不是全球时间戳的问题,因为调试输出的时间中的时间戳已经是+08:00的,而且过收市时间快10分钟了仍然能够从CTP接口收到tick推送,就与时间戳无关了。
【first bar time = 2020-10-16 14:52:00+08:00 history bar time = 2020-10-16 14:59:00+08:00,bars count=555】合成的第一个bar的时间比米筐的最新bar的时间也是晚了7分钟
【tick.datetime = 2020-10-16 14:53:00+08:00 is_first_bar=False】 此时实际时间已经15:01了,tick的时间晚了7分钟
【tick.datetime = 2020-10-16 14:54:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:55:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:56:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:57:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:58:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 14:59:00+08:00 is_first_bar=False】
【tick.datetime = 2020-10-16 15:00:00+08:00 is_first_bar=False】 此时实际时间已经15:10了,tick的时间晚了约10分钟
一般15:00国内的期货交易所就休市了,应该立即就没有tick推送了,可是就连vnpy上仍然可以见到ag2012.SHFE的tick在不断变化,和Demo_Strategy策略调试输出的信息也是吻合的。很显然tick时间比滞后实际时间大概7~10分钟
说明一下,本贴中的bar数据和K线数据其实是一回事,有时候是方便读代码就按照代码说,有时候为了尊崇人们的习惯来说,不必在意。
代码是这样的:
from typing import Any
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager
)
from vnpy.trader.object import (
BarData,
TickData
)
from vnpy.trader.constant import Interval
class DemoStrategy(CtaTemplate):
""" 一个演示策略 """
author = "hxxjava"
fast_window = 10
slow_window = 20
fast_ma0 = 0
fast_ma1 = 0
slow_ma0 = 0
slow_ma1 = 0
parameters = [
"fast_window",
"slow_window"
]
variables = [
"fast_ma0",
"fast_ma1",
"slow_ma0",
"slow_ma1",
]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict
):
"""构造函数"""
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = BarGenerator(
on_bar=self.on_bar,
window=7,
on_window_bar=on_7min_bar,
interval=Interval.Minute)
self.am = ArrayManager()
def on_init(self):
""""""
self.write_log("策略初始化")
# account_data = self.cta_engine.get_account()
self.load_bar(10)
def on_start(self):
"""策略启动"""
self.write_log("策略启动")
def on_stop(self):
""" 策略停止 """
self.write_log(" 策略停止 ")
def on_tick(self,tick:TickData):
""" Tick更新 """
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""K线更新"""
self.bg.update_bar(bar)
def on_7min_bar(self, bar: BarData):
"""K线更新"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
""" 定义金叉和死叉 """
cross_over = (self.fast_ma0>= self.fast_ma1 and
self.slow_ma0<self.slow_ma1)
cross_below = (self.slow_ma0>self.slow_ma1 and
self.slow_ma0<=self.slow_ma1)
if cross_over:
price = bar.close_price + 5
if not self.pos:
self.buy(price,1)
elif self.pos < 0:
self.cover(price,1)
self.buy(price,1)
elif cross_below:
price = bar.close_price - 5
if not self.pos:
self.short(price,1)
elif self.pos>0:
self.sell(price,1)
self.short(price,1)
# 更新图形界面
self.put_event()
这个策略是演示如何利用1分钟K线合成7分钟K线,然后在on_7min_bar()里面利用7分钟K线计算快慢两根移动均线,
然后更加快慢移动均线的金叉和死叉信号来进行多空的开仓和平仓操作,如此实现一个自动策略买卖交易。
在构造函数init()中创建BarGenerator类型self.bg和管理bar的ArrayManager类型的self.am
这里的重点是self.load_bar(10),该函数是策略的父类CtaTemplate的函数,代码是这样的:
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
self.cta_engine.load_bar()位于vnpy\app\cta_strategy.py中的CtaEngine类中,代码是这样的:
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],load_bar
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
for bar in bars:
callback(bar)
因为在策略中使用这样的语句self.load_bar(10),所以use_database参数为默认值False,可是我们知道目前CTP接口是不支持历史数据查询的,所以contract and contract.history_data的条件为假,导致bars 为空, 最终执行了:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
而self.query_bar_from_rq的代码是这样的:
def query_bar_from_rq(
self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
再看看rqdata_client.query_history(req)的代码,它把产生req的symbol,interval,start 和end各字段,转换成米筐接口可以接受的rq_symbol,rq_interval ,interval,start 和end等4个变量中,然后把end加上1天的时间【注意:这是非常重要的一个技巧,不然无法取出截止到当前交易时刻的1分钟bar!】,最后执行米筐接口函数rqdata_get_price()读取所有的10天多的bar数据,注意:是10天多的bar,而不是整10天的bar!
def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data from RQData.
"""
if self.symbols is None:
return None
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None
rq_interval = INTERVAL_VT2RQ.get(interval)
if not rq_interval:
return None
# For adjust timestamp from bar close point (RQData) to open point (VN Trader)
adjustment = INTERVAL_ADJUSTMENT_MAP[interval]
# For querying night trading period data
end += timedelta(1)
# Only query open interest for futures contract
fields = ["open", "high", "low", "close", "volume"]
if not symbol.isdigit():
fields.append("open_interest")
df = rqdata_get_price(
rq_symbol,
frequency=rq_interval,
fields=fields,
start_date=start,
end_date=end,
adjust_type="none"
)
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
dt = row.name.to_pydatetime() - adjustment
dt = CHINA_TZ.localize(dt)
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=dt,
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("open_interest", 0),
gateway_name="RQ"
)
data.append(bar)
return data
同时可以知道interval的默认值为Interval.MINUTE。
至此我们可以看出,self.load_bar(10)其实就是从米筐接口获取的1分钟历史数据。
这里执行了
self.bg.update_tick(tick)
这是在调用策略的K线合成器self.bg的update_tick() 函数,这个函数是用来把tick数据按照1分钟为间隔来产生1分钟bar的,当1分钟bar合成之时再次调用策略的on_bar()。
BarGenerator的update_tick()的函数代码如下:
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
分析得知它开始生成self.bar的条件是:
if not self.bar:
new_minute = True
....
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
也就是说只要刚刚启动策略,就会立即生成一根新bar,而没有寻求对齐整分钟,这样会造成首个bar的合成非常可能是不完整的!
self.bg.update_bar(bar)
这个函数是用1分钟bar来合成7分钟bar的,当7分钟bar合成完成后,它会以7分钟bar为参数调用策略的on_7min_bar()。
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
后面的代码就省略了
姑且不论策略是否可以赚钱,因为后面还要针对特定合约进行优化,这不是本帖讨论的重点!
从代码来看,一切都是那么自然,一个完美的例子!
这里分析的重点是假如我们在盘中启动策略的话,会发生什么问题,请看图:
如上图中所示:
我们知道从米筐接口读取的只有整分的K线数据,它不会提供没有走完的1分钟bar,所以如果你没有在整分钟结束的那一刻启动策略的话(做到这一点的概率太低了!),那么就一定会产生黄色的丢失部分。
因为第一根合成1分钟K线出现丢失部分,导致第一根合成1分钟K线的开、高、收、低、成交量和开仓兴趣都可能是错误的,进而导致利用1分钟K线合成的7分钟K线也是错误的,这可以说是连锁反应,当然也就会导致利用7分钟K线进行信号计算和交易查询问题!
也许你会说,有那么夸张吗?我不知道!不过这个丢失部分的时间长度在0~59.99秒之间,再说了就算是只有3秒的丢失,也可能是这1分钟中几乎全部的成交量,创新高、创新低都是有可能的,它的缺失也可能是让7分钟K线严重失真的重要原因,谁知道呢!我们这里分析目前的代码就是这样的,从原理上讲它确实是会出错的!
解决方法:
get_trading_dates() # 合约的所有的交易日
get_trading_hours() # 合约的所有的交易时间段
如果策略启动后最后一个历史1分钟bar与第一个tick数据在一个交易时间段(如9:00-10:15)中, 那么就可以判断出第一个1分钟K线出现了数据丢失,在这个第一个1分钟K线走完之时,就应该从米筐接口立即读取这个刚刚生成的历史1分钟bar,替换掉策略合成的第一个1分钟K线,其他的处理逻辑继续执行就可以了。按照第3节中的4的方法,修改BarGenerator,代码如下,可以解决问题:
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
self.is_first_bar = True # hxxjava add
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
from vnpy.trader.rqdata import rqdata_client # hxxjava add
from vnpy.trader.object import HistoryRequest # hxxjava add
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return False
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return False
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
# hxxjava add start
if self.is_first_bar:
self.is_first_bar = False
symbol,exchange = extract_vt_symbol(self.bar.vt_symbol)
bar_datetime = self.bar.datetime
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
start = bar_datetime,
end=bar_datetime,
interval=Interval.MINUTE
)
bars = rqdata_client.query_history(req)
self.bar = bars[-1]
print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
# hxxjava add end
self.on_bar(self.bar)
new_minute = True
if new_minute:
print(f"【tick.datetime = {tick.datetime} is_first_bar={self.is_first_bar}】")
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
def generate(self) -> Optional[BarData]:
"""
Generate the bar data and call callback immediately.
"""
bar = self.bar
if self.bar:
bar.datetime = bar.datetime.replace(second=0, microsecond=0)
self.on_bar(bar)
self.bar = None
return bar
这里所说的登录期货账户时间是指:启动软件到连接CTP接口界面,已经输入正确的期货账户和密码,从点击确定开始计时,经过CTP接口的用户身份认证登录行情和交易服务器,直到客户端接收服务器行情推送的当前市场在交易合约的合约信息、已经接受的当前用户的委托单、成交单、持仓和资金账户等信息的推送为止。
注明:每种都是测试5次,取平均值
测试1和2的区别是:都是vn.py 2.1.6软件,都是CTP网关,只是使用的账户类型不同,1是模拟期货账户,2是实际期货账户;
测试2和3的区别是:都是实际期货账户,都是CTP网关,只是使用的软件不同,2是模拟vn.py 2.1.6软件,3是快期V2软件的CTP版本;
为什么三个登录期货账户时间的差别这么大呢,对使用?
期望知道这种差别的原因是什么吗,解答一下,先谢谢啦!!!
哪一位是程老师呀?
用Python的交易员 wrote:
对的,现在依赖库没法自动更新,我们之前考虑过,但是因为网络问题某些比较大的包(pands之类)很容易下载失败,导致死活都无法更新vn.py,这样用户体验更差了...
未来理想情况下,其实应该是我们维护一个类似anaconda的Python包服务,不过这个成本太高目前还负担不了
理解您的苦衷,目前的vnpy已经非常优秀,非常感谢你们的付出!
下面的代码位于vnpy\usertools\my_cta_template.py中
from typing import Any,List,Dict,Tuple
import copy
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager,
StopOrder,
Direction
)
from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event
from vnpy.trader.object import (
LogData,
TickData,
BarData,
TradeData,
OrderData,
)
from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval
from vnpy.app.cta_strategy.base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_TICK,
EVENT_CTA_HISTORY_BAR,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import (
NewChartWidget,
CandleItem,
VolumeItem,
LineItem,
SmaItem,
RsiItem,
MacdItem,
)
from vnpy.usertools.my_strategy_tool import FixedBarGenerator
from vnpy.trader.engine import SamEngine
class MyCtaTemplate(CtaTemplate):
"""
一个包含可视化K线图表和策略账户的策略模板
"""
init_money:float = 100000.0 # 初始资金
kx_interval:int = 5
show_chart = False # 显示K线图表
kx_count:int = 0
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = FixedBarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar,vt_symbol=self.vt_symbol)
self.am = ArrayManager()
cta_engine:CtaEngine = self.cta_engine
self.engine_type = cta_engine.engine_type
self.even_engine = cta_engine.main_engine.event_engine
# 必须在这里声明,因为它们是实例变量
self.all_bars:List[BarData] = []
self.cur_window_bar:[BarData] = None
self.bar_updated = False
# 策略账户引擎
self.sam_engine:SamEngine = cta_engine.main_engine.get_engine('sam')
def on_init(self):
"""
Callback when strategy is inited.
"""
# 创建或者获得策略账户
if self.sam_engine.create_strategy_account(strategy_name=self.strategy_name,vt_symbols=[self.vt_symbol],
init_money=self.init_money,pickup_position=True):
self.sam_engine.notify_strategy_ui(self.strategy_name)
self.write_log(f"策略账户{self.strategy_name}创建成功!")
account_info = self.sam_engine.get_strategy_total_money(self.strategy_name)
self.write_log(f"策略账户{account_info}")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bar_updated = False
self.current_tick = tick # 记录最新tick
# 再更新tick,产生1分钟K线乃至N 分钟线
self.bg.update_tick(tick)
if self.inited:
# 先产生当前临时K线
self.cur_window_bar = self.get_cur_window_bar()
if self.cur_window_bar:
# 发送当前临时K线更新消息
self.send_event(EVENT_CTA_BAR,self.cur_window_bar)
self.send_event(EVENT_CTA_TICK,tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
if self.inited:
self.write_log(f"I got a 1min BarData at {bar.datetime}")
self.bg.update_bar(bar)
self.bar_updated = True
def on_Nmin_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.all_bars.append(bar)
self.kx_count = len(self.all_bars)
if self.inited:
self.write_log(f"I got a {self.kx_interval}min BarData at {bar.datetime}")
self.send_event(EVENT_CTA_BAR,bar)
self.put_event()
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.send_event(EVENT_CTA_TRADE,trade)
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
self.send_event(EVENT_CTA_ORDER,order)
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
self.send_event(EVENT_CTA_STOPORDER,stop_order)
def get_cur_window_bar(self):
window_bar = copy.deepcopy(self.bg.window_bar)
bar = self.bg.bar
if not(window_bar): # 刚产生过window_bar
return None
if self.bar_updated: # 刚更新过window_bar
return window_bar
# 上一分钟window_bar和当前bar合成出临时window bar
window_bar.high_price = max(window_bar.high_price, bar.high_price)
window_bar.low_price = min(window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
window_bar.close_price = bar.close_price
window_bar.volume += int(bar.volume)
window_bar.open_interest = bar.open_interest
return window_bar
def send_event(self,event_type:str,data:Any):
"""
只在实盘引擎并且配置为显示K线图表的情况下发送小线
"""
if self.engine_type==EngineType.LIVE and self.show_chart: # "如果显示K线图表"
self.even_engine.put(Event(event_type,(self.strategy_name,data)))
def init_kx_chart(self,kx_chart:NewChartWidget=None): # 提供给外部调用
if kx_chart:
kx_chart.add_plot("candle", hide_x_axis=True)
kx_chart.add_plot("volume", maximum_height=150)
kx_chart.add_item(CandleItem, "candle", "candle")
kx_chart.add_item(VolumeItem, "volume", "volume")
MyCtaTemplate的这3个成员需要参数化:
init_money:float = 100000.0 # 初始资金
kx_interval:int = 5
show_chart = False # 显示K线图表
MyCtaTemplate的这个变量需要输出:
kx_count:int = 0
保存下面的代码到 [用户目录]\strategies\chart_strategy.py
from typing import Any,List,Dict,Tuple
from vnpy.usertools.my_cta_template import MyCtaTemplate
from vnpy.app.cta_strategy.base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_TICK,
EVENT_CTA_HISTORY_BAR,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import (
NewChartWidget,
ChartItem,
CandleItem,
VolumeItem,
LineItem,
SmaItem,
RsiItem,
MacdItem,
)
import pyqtgraph as pg
from PyQt5 import QtGui
class ChartStrategy(MyCtaTemplate):
author = "hxxjava"
atr_window = 20
atr_value = 0.0
parameters = [
"init_money",
"kx_interval",
"show_chart",
"atr_window",
]
variables = [
"kx_count",
"atr_value",
]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
def on_init(self):
"""
Callback when strategy is inited.
"""
super().on_init()
self.load_bar(20)
if len(self.all_bars)>0 and self.show_chart:
self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)
def on_start(self):
""" """
self.write_log("已开始")
def on_stop(self):
""""""
self.write_log("_kx_strategy 已停止")
def init_kx_chart(self,kx_chart:NewChartWidget=None): # 提供给外部调用
# self.write_log("init_kx_chart executed !!!")
super().init_kx_chart(kx_chart)
if kx_chart:
kx_chart.add_plot("rsi", maximum_height=150)
kx_chart.add_plot("macd", maximum_height=150)
kx_chart.add_item(LineItem, "line", "candle")
kx_chart.add_item(SmaItem, "sma1", "candle")
kx_chart.add_item(SmaItem, "sma2", "candle")
sma1:SmaItem = kx_chart.get_item("sma1")
sma2:SmaItem = kx_chart.get_item("sma2")
blue_pen:QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
red_pen:QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=2)
sma1.set_pen(blue_pen)
sma1.set_sma_window(5)
sma2.set_pen(red_pen)
sma2.set_sma_window(20)
kx_chart.add_item(RsiItem, "rsi", "rsi")
kx_chart.add_item(MacdItem, "macd", "macd")
kx_chart.add_last_price_line()
kx_chart.add_cursor()
注意上面的策略类的成员parameters和variables的赋值内容,这是本贴的重点!!!
对CtaTemplate的扩展策略模板MyCtaTemplate,必须在继承MyCtaTemplate的最终策略中完成参数化和变量输出。
vnstation\ui\main\mainwindow.py中的MainWindow类中的init_ui()代码是这样的:
def init_ui(self):
""""""
self.setWindowTitle(f'VN Station {vnstation.__version__}')
self.resize(QtCore.QSize(1050, 800))
url = 'https://www.vnpy.com/portal'
self.browser = create_web_view()
self.browser.setUrl(QtCore.QUrl(url))
self.text_edit = QtWidgets.QTextEdit()
self.text_edit.setReadOnly(True)
self.signal_stdout.connect(self.text_edit.append)
button_height = 60
lite_button = QtWidgets.QPushButton('VN Trader Lite')
lite_button.clicked.connect(self.open_lite_clicked)
lite_button.setFixedHeight(button_height)
lite_button.setToolTip("加载了CTP接口和CTA策略模块的期货量化平台")
trader_button = QtWidgets.QPushButton('VN Trader Pro')
trader_button.clicked.connect(self.open_pro_clicked)
trader_button.setFixedHeight(button_height)
trader_button.setToolTip("根据需求灵活配置的机构级量化交易平台")
jupyter_button = QtWidgets.QPushButton('Jupyter Notebook')
jupyter_button.clicked.connect(self.open_jupyter_clicked)
jupyter_button.setFixedHeight(button_height)
jupyter_button.setToolTip("在Jupyter的交互式环境中进行量化研究")
encrypt_button = QtWidgets.QPushButton('策略加密')
encrypt_button.clicked.connect(self.encrypt_clicked)
encrypt_button.setFixedHeight(button_height)
encrypt_button.setToolTip("将Python源代码加密生成pyd二进制文件")
community_button = QtWidgets.QPushButton(u'提问求助')
community_button.clicked.connect(self.open_community_clicked)
community_button.setFixedHeight(button_height)
community_button.setToolTip("使用中的疑问请通过vn.py社区论坛求助")
update_button = QtWidgets.QPushButton("已是最新")
update_button.clicked.connect(self.update_clicked)
update_button.setFixedHeight(button_height)
update_button.setEnabled(False)
self.update_button = update_button
# hbox_top = QtWidgets.QHBoxLayout()
# hbox_top.addWidget(self.browser)
# hbox_top.addWidget(self.text_edit)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(lite_button)
hbox.addWidget(trader_button)
hbox.addWidget(jupyter_button)
hbox.addWidget(encrypt_button)
hbox.addWidget(community_button)
hbox.addWidget(update_button)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(self.browser)
vbox.addLayout(hbox)
widget = QtWidgets.QWidget()
widget.setLayout(vbox)
self.setCentralWidget(widget)
self.tray_icon = None
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.trigger_useronline)
self.timer.start(1000 * 60 * 25)
其中的self.browser = create_web_view()
def create_web_view(parent=None):
view = QWebEngineView(parent=parent)
view.setPage(QWebEnginePage(_web_engine_profile, view))
return view
而QWebEngineView和QWebEnginePage来自PyQt5.QtWebEngineWidgets:
from PyQt5.QtWebEngineWidgets import QWebEnginePage, QWebEngineProfile, QWebEngineView
因为可以肯定的是新安装的vnpy,在进入论坛界面是可以缩放的,执行下面的指令:
pip install --upgrade PyQt5
卸载vn.py,然后重新安装,当前为vn.py2.1.6,就OK了!
当然这种方法有点笨了些!不过相比前一种方法,时间上可能还短很多,讽刺吧?!
造成这种问题的原因是:vn.py系统更新时,没有考虑将其他的依赖模块和包做同步更新,导致很多古怪的问题的出现。
建议在生成版本更新脚本的时候,能够考虑将其他的依赖模块和包的同步更新。
我的vn.py登录后,社区论坛界面没有办法缩放,看起来非常不舒服,求指教!