最近发现quantaxis的1分钟数据7万条数据耗时0:00:00.001997
+
但是vnpy的database_manager.load_bar_data,提取7万条数据要10秒钟,所以想着手改造一下,
+
但是我发现quantaxis用的是pymongo的collection.insert_many()方法存数据,collection.find()方法读取数据,在vnpy的数据库源代码却找不到相关的方法,所以想问下各位大佬vnpy的MongoDB连接是不是基于pymongo写的?有没有可能优化后达到quantaxis的提取速度?
+
def load_bar_data(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
collection_name: str = None,
) -> Sequence[BarData]:
if collection_name is None:
s = DbBarData.objects(
symbol=symbol,
exchange=exchange.value,
interval=interval.value,
datetime__gte=start,
datetime__lte=end,
)
else:
with switch_collection(DbBarData, collection_name):
s = DbBarData.objects(
symbol=symbol,
exchange=exchange.value,
interval=interval.value,
datetime__gte=start,
datetime__lte=end,
)
data = [db_bar.to_bar() for db_bar in s]
return data
vnpy代码上面没有pymongo的影子。
大佬们,我是装了发行版2.1.7在C盘,同时我在D盘搞了个源代码文件夹vnpy2.1.7(里面也有vnpy文件夹),pycharm的解释器路径设置的是c盘vnpy的发行版安装的那个python.exe。
+
+
现在我在D盘的vnpy2.1.7文件夹,把里面的vnpy文件夹代码改了很多(包括MongoDB分表储存、成交额因子等等记不清了)
+
我在D盘的vnpy2.1.7文件夹下写的代码
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
上面这些代码引用的是C盘的发行版里面的vnpy库,还是D盘的VNPY2.1.7文件夹下面的vnpy文件件代码?
不太理解这个引用机制,请教下各位大佬!
我看很多地方离有注册如下事件:
def register_event(self):
""""""
# self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
我理解这里是注册了一个EVENT_CONTRACT事件对应的处理函数,但是我想搞清楚EVENT_CONTRACT是哪个模块的哪个代码文件推出来的?
pandas的dataframe(从csv读取而来),在处理过程中,原来导入1min的是增加一列,这个写法运行无误
imported_data['interval'] = Interval.MINUTE
+
现在想导入5min,以下写法均报错
imported_data['interval'] = 5*Interval.MINUTE
imported_data['interval'] = “5min”
请假该如何写这里的代码?感谢!(读取还没试,因为没保存进去)
看了许久,没研究清楚下面这个代码中哪里订阅合约?
+
我看所有的示例策略中都没有 合约 这个参数,示例策略中也没有订阅过程,也就是策略和合约是完全分离的。。。疑惑
+
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
sleep(10)
cta_engine.init_engine()
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭子进程")
main_engine.close()
sys.exit(0)
尽量帮忙用代码指导,感激不尽!
+
需求:
合约交易时间:9:30-11:30及13:00-15:00
+
交易时间段内每分钟结束后的第5秒,触发一个事件。
+
上面这个需求不想通过on bar 里面的sleep5秒来实现(因为这样本质上还是bar事件逻辑,而非时间事件逻辑),我希望能实现策略能实现定时触发一个处理函数(时间事件逻辑)
+
感谢!!!!
`def QA_data_futuremin_resample20201205(
min_data,
type_='5min',
exchange_id=Exchange.CFFEX
):
"""期货分钟线采样成大周期
分钟线采样成子级别的分钟线
future:
vol ==> trade
amount X
期货一般两种模式:
中金所 股指期货: 9:30 - 11:30/ 13:00 -15:00
其他期货: -1 21:00: 2:30 / 9:00 - 11:30 / 13:30-15:00
输入demo
open high low close open_interest volume
datetime
2020-11-24 09:33:00+08:00 3465.9598 3466.3631 3463.9408 3465.0981 68138.0 588.0
2020-11-24 09:34:00+08:00 3465.5208 3470.3931 3464.5840 3469.7117 67968.0 676.0
2020-11-24 09:35:00+08:00 3469.9932 3470.6966 3466.8064 3468.5448 67851.0 427.0
2020-11-24 09:36:00+08:00 3468.4432 3470.1731 3463.9361 3464.2188 67684.0 431.0
2020-11-24 09:37:00+08:00 3464.0211 3464.3037 3456.2692 3457.5784 67481.0 532.0
2020-11-24 09:38:00+08:00 3457.8283 3457.8737 3451.8422 3453.3229 67163.0 603.0
2020-11-24 09:39:00+08:00 3453.7302 3456.1880 3453.2948 3455.7669 66960.0 358.0
2020-11-24 09:40:00+08:00 3455.5569 3455.6181 3450.4431 3450.4431 66642.0 568.0
"""
CONVERSION = {
# 'code': 'first',
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
# 'datetime': 'last',
'open_interest': 'last',
'volume': 'sum'
}
min_data = min_data.loc[:, list(CONVERSION.keys())]
idx = min_data.index
if exchange_id == Exchange.CFFEX:
part_1 = min_data.iloc[idx.indexer_between_time('9:30', '11:30')]
part_1_res = part_1.resample(
type_,
base=30,
closed='right',
loffset=type_
).apply(CONVERSION)
# part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:00')]
part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:15')]#为了适配中金所国债期货交易时间20201205
part_2_res = part_2.resample(
type_,
base=0,
closed='right',
loffset=type_
).agg(CONVERSION)
return pd.concat(
[part_1_res,
part_2_res]
# ).dropna().sort_index().reset_index().set_index(['datetime','code'])
).dropna().sort_index()
else:
part_1 = min_data.iloc[np.append(
idx.indexer_between_time('0:00',
'11:30'),
idx.indexer_between_time('0:00',
'11:30')
)]
part_1_res = part_1.resample(
type_,
base=0,
closed='right',
loffset=type_
).apply(CONVERSION)
part_2 = min_data.iloc[idx.indexer_between_time('13:30', '15:00')]
part_2_res = part_2.resample(
type_,
base=30,
closed='right',
loffset=type_
).agg(CONVERSION)
part_3 = min_data.iloc[idx.indexer_between_time('21:00', '23:59')]
part_3_res = part_3.resample(
type_,
base=0,
closed='right',
loffset=type_
).agg(CONVERSION)
return pd.concat(
[part_1_res,
part_2_res,
part_3_res]
# ).dropna().sort_index().reset_index().set_index(['datetime','code'])
).dropna().sort_index()`
从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,请教一下各位大佬这是不是MongoDB的性能问题,如果想减少这个读取时间,就要换其他数据库?有什么推荐么?
备注:MongoDB数据已经分表储存。
1++++
首先在回测代码中通过引擎属性赋值参数如下:
#%%
engine = BacktestingEngine()
engine.set_parameters(
vt_symbol="IH99.CFFEX",
interval="1m",
start=datetime(2019, 6, 1),
end=datetime(2019, 6, 30),
rate=0.3/10000,
slippage=0.2*5,
size=300,
pricetick=0.2,
capital=1_000_000,
collection_name = "IH99"
)
engine.add_strategy(AtrRsiStrategy, {})
+
+
2+++++
AtrRsiStrategy策略中onbar如下:
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
df01=load_select_bar_data_from_mongodb(self.symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),
bar.datetime,self.collection_name)
print (df01)
self.put_event()
注:load_select_bar_data_from_mongodb是自定义的从MongoDB数据库读取bardata并生成dataframe的函数,有关于symbol等参数需要传入。
+
+
+
3++++++++++
回测运行报错如下:
2020-11-30 22:50:43.167187 Traceback (most recent call last):
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\backtesting.py", line 315, in run_backtesting
self.callback(data)
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py", line 129, in on_bar
df01=load_select_bar_data_from_mongodb(self.symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),
AttributeError: 'AtrRsiStrategy' object has no attribute 'symbol'
+
+
+
4++++++++++
所以想请教大佬们,如何让策略接受到上述回测引擎engine.set_parameters所设置的属性?
变通解决方法:将bardata时间偏离设置为7小时54分。即utc_86 = timezone(timedelta(hours=7,minutes=54)),这样读取出来的数据分钟就是正常的了
csv导入MongoDB代码如下:
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
from datetime import datetime, timedelta, timezone
# 中国时区是+8,对应参数hours=8
# utc_8 = timezone(timedelta(hours=8))
utc_86 = timezone(timedelta(hours=7,minutes=54))#变通
# datetime=row.datetime.replace(tzinfo=utc_8)
import pytz
tz = pytz.timezone('Asia/Shanghai')
print('tz=',tz)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
# datetime=tz.localize(row.datetime),
datetime=row.datetime.replace(tzinfo=utc_86),
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
)
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
print ('start=',start)
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars,collection_name)
print(f'Insert Bar: {count} from {start} - {end}')
if __name__ == "__main__":
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('IH99_20101127_20201127_2.csv',encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['exchange'] = Exchange.CFFEX
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE
# 明确需要是float数据类型的列
float_columns = ['open','high','low','close','volume','open_interest']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'
imported_data['datetime'] = pd.to_datetime(imported_data['datetime'],format=datetime_format)
品种代码='IH9902'
imported_data['symbol'] = 品种代码
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
# imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
# imported_data = imported_data.rename(index=str,
# columns={"时间": "datetime",
# "KQ.i@CFFEX.T.high": "high",
# "KQ.i@CFFEX.T.low": "low",
# "KQ.i@CFFEX.T.close": "close",
# "KQ.i@CFFEX.T.volume": "volume",
# "KQ.i@CFFEX.T.close_oi": "open_interest",
# })
# 筛选展示的列名
# imported_data = imported_data[["datetime","open", "high", "low", "close", "open_interest", "volume"]]
print('//',imported_data.head(1).append(imported_data.tail(1)),"//")
move_df_to_mongodb(imported_data,品种代码)
+
+
+
+
不知道图能否看清
就是我数据库里没有IH99 9:25的数据,但是通过datamanager方法读取来的数据时间戳有9:25,应该是9:31才对
大佬们
我看cta的例子里面是在def on_init(self):里面self.load_bar(10) ,即获取十天的bar数据
我现想在def on_bar(self, bar: BarData):函数中,向前取2个月的数据(1分钟bardata),这时候我应该用哪个函数api来获取呢?
并同时将这两个月的1分钟bardata转为dataframe形式使用,请教一下各位大佬!
BarGenerator模块是以tick数据驱动,如果一个合约长时间没有tick数据过来将导致分钟数据缺失,应该如何解决这个问题呢,要求是实时性(分钟级别),不是盘后解决哈。
请教各位大佬如何修改BarGenerator或者通过其他途径解决呢!
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
代码来源:vnpy/app/data_recorder/engine.py
我想请教下大佬这里关于的多线程的问题
1、在get一个task后,完成一个task后面的代码,才会取出queue中的下一个task
2、无论当前取走的task是否完成,都会在timeout=1后继续取task出来进行处理
想问下我哪个理解是正确的?或者有其他指点请赐教,感谢大佬!
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_SPREAD_DATA, self.process_spread_event)
代码来源vnpy/app/data_recorder/engine.py
请教下EVENT_SPREAD_DATA这是什么数据引发的什么事件?
vnpy的1分钟行情记录是把当前tick的时间当做K线的起始时间的, 比如我要记录期货合约9:00到11:30的行情, 实际写入到数据库的行情的时间是9:00, 9:01, 9:02......11:29, 米筐数据是9.01,9.02,9.03,...11:30
请问如何修改BarGenerator源码能使其生成bar数据的时间戳同米筐数据保持一致呢,感谢!
下面是BarGenerator源码:
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return
if not self.bar:
new_minute = True
elif(self.bar.datetime.minute != tick.datetime.minute) or (self.bar.datetime.hour != tick.datetime.hour):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
如题我看其他视频博主 vnpy中全局配置 有数据库相关的配置,目前我在使用中的全局配置和视频博主里的不一样
我的:
视频博主的:
大佬们,我看vnpy项目库里没有单独开进程进行行情记录的demo code(论坛里好像有小伙伴说有。。),有没有大佬分享下,感谢!!
电脑win10 64位,之前的经验贴已阅读,但是这个错误前面的人没遇到
运行python -m vnstation
返回:
(VN Studio) C:\vnstudio>python -m vnstation
Traceback (most recent call last):
File "C:\vnstudio\lib\runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "C:\vnstudio\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\vnstudio\lib\site-packages\vnstation__main.py", line 4, in <module>
cli()
File "C:\vnstudio\lib\site-packages\click\core.py", line 829, in call
return self.main(args, kwargs)
File "C:\vnstudio\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1236, in invoke
return Command.invoke(self, ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback, ctx.params)
File "C:\vnstudio\lib\site-packages\click\core.py", line 610, in invoke
return callback(args, kwargs)
File "C:\vnstudio\lib\site-packages\click\decorators.py", line 21, in new_func
return f(get_current_context(), *args, kwargs)
File "C:\vnstudio\lib\site-packages\vnstation\cli.py", line 15, in cli
run()
File "C:\vnstudio\lib\site-packages\click\core.py", line 829, in call
return self.main(args, kwargs)
File "C:\vnstudio\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback, ctx.params)
File "C:\vnstudio\lib\site-packages\click\core.py", line 610, in invoke
return callback(args, **kwargs)
File "C:\vnstudio\lib\site-packages\vnstation\cli.py", line 20, in run
import vnstation.run
File "C:\vnstudio\lib\site-packages\vnstation\run.py", line 4, in <module>
from vnpy.trader.ui import QtGui, create_qapp
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\init__.py", line 11, in <module>
from .mainwindow import MainWindow
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 14, in <module>
from .widget import (
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\widget.py", line 19, in <module>
from ..engine import MainEngine
File "C:\vnstudio\lib\site-packages\vnpy\trader\engine.py", line 42, in <module>
from .setting import SETTINGS
File "C:\vnstudio\lib\site-packages\vnpy\trader\setting.py", line 30, in <module>
"database.timezone": get_localzone().zone,
File "C:\vnstudio\lib\site-packages\tzlocal\win32.py", line 93, in get_localzone
_cache_tz = pytz.timezone(get_localzone_name())
File "C:\vnstudio\lib\site-packages\tzlocal\win32.py", line 84, in get_localzone_name
raise pytz.UnknownTimeZoneError('Can not find timezone ' + tzkeyname)
pytz.exceptions.UnknownTimeZoneError: 'Can not find timezone '