相当于整体往前移动了6分钟,很费解,读取出来的每天都是往前移6分钟
gateway_name symbol exchange datetime interval volume open_interest open high low close
0 DB IH99 Exchange.CFFEX 2019-05-31 09:25:00+08:00 Interval.MINUTE 767.0 54422.0 2713.5703 2716.4436 2709.8341 2712.8189
1 DB IH99 Exchange.CFFEX 2019-05-31 09:26:00+08:00 Interval.MINUTE 359.0 54205.0 2712.7911 2716.2056 2712.6688 2713.1206
2 DB IH99 Exchange.CFFEX 2019-05-31 09:27:00+08:00 Interval.MINUTE 323.0 53981.0 2713.1068 2713.5549 2709.9648 2709.9710
3 DB IH99 Exchange.CFFEX 2019-05-31 09:28:00+08:00 Interval.MINUTE 422.0 53717.0 2710.1046 2712.3135 2708.4665 2710.7099
4 DB IH99 Exchange.CFFEX 2019-05-31 09:29:00+08:00 Interval.MINUTE 291.0 53536.0 2711.1412 2714.8780 2710.6063 2713.2508
5 DB IH99 Exchange.CFFEX 2019-05-31 09:30:00+08:00 Interval.MINUTE 626.0 53167.0 2713.3754 2714.0023 2709.8716 2711.3979
6 DB IH99 Exchange.CFFEX 2019-05-31 09:31:00+08:00 Interval.MINUTE 354.0 52976.0 2711.2408 2713.0316 2709.2843 2712.6573
7 DB IH99 Exchange.CFFEX 2019-05-31 09:32:00+08:00 Interval.MINUTE 416.0 52736.0 2712.7938 2717.9527 2711.2315 2717.3653
8 DB IH99 Exchange.CFFEX 2019-05-31 09:33:00+08:00 Interval.MINUTE 506.0 52480.0 2717.7916 2721.5836 2715.4050 2720.7167
9 DB IH99 Exchange.CFFEX 2019-05-31 09:34:00+08:00 Interval.MINUTE 519.0 52240.0 2720.8500 2725.9267 2719.0185 2724.5785
不知道图能否看清
就是我数据库里没有IH99 9:25的数据,但是通过datamanager方法读取来的数据时间戳有9:25,应该是9:31才对
2020-11-29 23:26:59.647871 开始加载历史数据
2020-11-29 23:27:00.047876 加载进度:########## [100%]
2020-11-29 23:27:00.047876 历史数据加载完成,数据量:4560
2020-11-29 23:27:00.047876 触发异常,回测终止
2020-11-29 23:27:00.051934 Traceback (most recent call last):
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\backtesting.py", line 344, in run_backtesting
self.callback(data)
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py", line 126, in on_bar
self.load_select_bar_data_from_mongodb(self.vt_symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),bar.datetime,self.collection_name)
AttributeError: 'AtrRsiStrategy' object has no attribute 'exchange'
这个代码写的还不够严禁 部分属性有问题
最近问题比较多,主要就是怕自己实现的方法走弯路,低效,
所以才想跟大佬们请教一下是否是简洁有效的写法,不浪费性能,感谢
关于在策略里读取历史数据并生成dataframe的方法我写了一个初稿,请各位大佬斧正
def load_select_bar_data_from_mongodb(self,symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
collection_name: str = None):
df = pd.DataFrame(database_manager.load_bar_data(symbol, exchange, interval, start, end, collection_name))
df = df.rename(index=str, columns={"open_price": "open",
"high_price": "high",
"low_price": "low",
"close_price": "close",
})
data_select_columns = ["open", "high", "low", "close", "open_interest", "volume"]
df = df.set_index('datetime')[data_select_columns]
print(df)
return df
我是准备在on_bar触发的时候调用这个函数取得想要的起止日期的dataframe
有如下两方面具体问题请请教大佬
1这个函数方法是否有必要写到比如vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py策略文件中 ,并设置为一个策略类的属性方法?还是说我放在类外面独立为一个单独的函数就够了?
2回测中反复从MongoDB中读取数据生成dataframe是否是一种低效的方式?有办法改进么,在同实盘保持接轨的思路下(策略是1分钟级别)?
大佬们
我看cta的例子里面是在def on_init(self):里面self.load_bar(10) ,即获取十天的bar数据
我现想在def on_bar(self, bar: BarData):函数中,向前取2个月的数据(1分钟bardata),这时候我应该用哪个函数api来获取呢?
并同时将这两个月的1分钟bardata转为dataframe形式使用,请教一下各位大佬!
用Python的交易员 wrote:
BarGenerator本身是tick驱动的,如果没有tick就合成不了了。如果硬要加上定时检查,需要额外增加定时检查的逻辑(监听EVENT_TIMER)事件,然后每秒检查当前时间和缓存K线时间戳的偏离度,超过一个水平就全部合成K线。
好的好的 谢谢大佬指导!
CTA回测引擎改造部分,我看楼主只改造了backtesting.py文件,该目录里面的engine.py里面也有loadbar,如下
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
请教一下,这个引擎engine.py要改么?
callingpulse wrote:
2020/09/09更新
原来代码bar = BarData( symbol=row.symbol, exchange=row.exchange, datetime=row.datetime,#报错的地方,需要增加时区信息 interval=row.interval, volume=row.volume, open_price=row.open, high_price=row.high, low_price=row.low, close_price=row.close, open_interest=row.open_interest, gateway_name="DB", )运行会报错:
AttributeError: 'str' object has no attribute 'astimezone'
原因是新版本vnpy支持了时区数据,所以
datetime=row.datetime
需要加上时区信息from datetime import datetime, timedelta, timezone # 中国时区是+8,对应参数hours=8 # 日本时区是+9,hours=9 utc_8 = timezone(timedelta(hours=8)) datetime=row.datetime.replace(tzinfo=utc_8)
一个小技巧,把bar里面的数据打印出来,看看都是什么格式。
sql_manager.get_oldest_bar_data()
大佬牛!!!出错后发现真是这个问题导致的,必须加时区
BarGenerator模块是以tick数据驱动,如果一个合约长时间没有tick数据过来将导致分钟数据缺失,应该如何解决这个问题呢,要求是实时性(分钟级别),不是盘后解决哈。
请教各位大佬如何修改BarGenerator或者通过其他途径解决呢!
梗 wrote:
log内的时间没解释清楚,以第一根bar为例:
2020-11-25 19:46:00: API_STABILITY_MONITOR: 379.240000, actual time 20:59:02.225194, num_bar = 556
19:46:00 --- 这个时bar内的datatime; 20:59:02.225194 这个是当前时间
从时间看这一根bar可以看作是非交易时间推送的fake_data,这个需要策略自己做过滤么?而2020-11-25 20:59:00 和2020-11-25 20:59:00这两根bar实际获取时间分别是 21:00:02.015919和09:00:02.269108,都在交易时间之内
2020-11-25 20:59:00: API_STABILITY_MONITOR: 380.540000, actual time 21:00:02.015919, num_bar = 2
2020-11-25 21:00:00: API_STABILITY_MONITOR: 380.240000, actual time 21:01:00.911799, num_bar = 3
这两个是tick对么?这两个不用过滤吧 要保留
BELLCOUSIN wrote:
应该就是
在vnpy/gateway/ctp/ 增加以上文件: ctp_gateway_double.py
在vnpy/gateway/ctp/init.py 导入包from .ctp_gateway_double import CtpGatewayDouble
在run.py里面:
from vnpy.gateway.ctp import CtpGateway from vnpy.gateway.ctp import CtpGatewayDouble
main_engine.add_gateway(CtpGateway) main_engine.add_gateway(CtpGatewayDouble)
在ctp_setting配置文件里面. 配置下ctp_setting增加下
"次行情服务器": "***.***.***.***:****",
为啥我会重复推tick...
不是用hash过滤了吗
请大佬解答下我哪里步骤错了.
我觉得有可能是self.tick_buffer的问题
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
代码来源:vnpy/app/data_recorder/engine.py
我想请教下大佬这里关于的多线程的问题
1、在get一个task后,完成一个task后面的代码,才会取出queue中的下一个task
2、无论当前取走的task是否完成,都会在timeout=1后继续取task出来进行处理
想问下我哪个理解是正确的?或者有其他指点请赐教,感谢大佬!
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_SPREAD_DATA, self.process_spread_event)
代码来源vnpy/app/data_recorder/engine.py
请教下EVENT_SPREAD_DATA这是什么数据引发的什么事件?
vnpy的1分钟行情记录是把当前tick的时间当做K线的起始时间的, 比如我要记录期货合约9:00到11:30的行情, 实际写入到数据库的行情的时间是9:00, 9:01, 9:02......11:29, 米筐数据是9.01,9.02,9.03,...11:30
请问如何修改BarGenerator源码能使其生成bar数据的时间戳同米筐数据保持一致呢,感谢!
下面是BarGenerator源码:
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return
if not self.bar:
new_minute = True
elif(self.bar.datetime.minute != tick.datetime.minute) or (self.bar.datetime.hour != tick.datetime.hour):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
守望长城2020-6-11-艾瑞巴蒂 wrote:
个人觉得应该在执行完main_engine.connect(CTP_SETTING, "CTP")之后,在主线程中添加阻塞,以等待connect方法完全执行, 如果不阻塞一下主线程的话, 有一定的概率有部分eContract事件会先"跑掉",做不到在执行完来自oms的process_contract_event之后紧接着执行来自RecorderEngine的process_contract_event方法, 这是有风险的,最保险的做法是在主线程中增加sleep, 阻塞主线程一些时间.
问题初步解决 不能加sleep 加了就运行不了 还没搞清楚原因
如题我看其他视频博主 vnpy中全局配置 有数据库相关的配置,目前我在使用中的全局配置和视频博主里的不一样
我的:
视频博主的:
我理解,这个行情录制代码里,新建了一个数据记录类WholeMarketRecorder,在子进程中实例了一次(但是实例的那些方法何时调用的 不清楚。。)
运行这段代码后输出如上,然后后续一直没反应,数据库中也没有数据
同时我在WholeMarketRecorder的process_contract_event方法中 写了打印合约名称,也没运行出来(说明没运行这段代码)
大佬们有空的话 方便指导下么