最近发现quantaxis的1分钟数据7万条数据耗时0:00:00.001997
+
但是vnpy的database_manager.load_bar_data,提取7万条数据要10秒钟,所以想着手改造一下,
+
但是我发现quantaxis用的是pymongo的collection.insert_many()方法存数据,collection.find()方法读取数据,在vnpy的数据库源代码却找不到相关的方法,所以想问下各位大佬vnpy的MongoDB连接是不是基于pymongo写的?有没有可能优化后达到quantaxis的提取速度?
+
def load_bar_data(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
collection_name: str = None,
) -> Sequence[BarData]:
if collection_name is None:
s = DbBarData.objects(
symbol=symbol,
exchange=exchange.value,
interval=interval.value,
datetime__gte=start,
datetime__lte=end,
)
else:
with switch_collection(DbBarData, collection_name):
s = DbBarData.objects(
symbol=symbol,
exchange=exchange.value,
interval=interval.value,
datetime__gte=start,
datetime__lte=end,
)
data = [db_bar.to_bar() for db_bar in s]
return data
vnpy代码上面没有pymongo的影子。
大佬们,我是装了发行版2.1.7在C盘,同时我在D盘搞了个源代码文件夹vnpy2.1.7(里面也有vnpy文件夹),pycharm的解释器路径设置的是c盘vnpy的发行版安装的那个python.exe。
+
+
现在我在D盘的vnpy2.1.7文件夹,把里面的vnpy文件夹代码改了很多(包括MongoDB分表储存、成交额因子等等记不清了)
+
我在D盘的vnpy2.1.7文件夹下写的代码
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
上面这些代码引用的是C盘的发行版里面的vnpy库,还是D盘的VNPY2.1.7文件夹下面的vnpy文件件代码?
不太理解这个引用机制,请教下各位大佬!
搞清楚了。。是ctp-gatway生成的
我看很多地方离有注册如下事件:
def register_event(self):
""""""
# self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
我理解这里是注册了一个EVENT_CONTRACT事件对应的处理函数,但是我想搞清楚EVENT_CONTRACT是哪个模块的哪个代码文件推出来的?
老大 不好意思 才看到
的22楼
xiaohe:是database_mongo.py里to_bar函数对datetime的处理导致的,改成datetime=DB_TZ.localize(self.datetime)就可以了。这个问题会在下个版本修复的。
+
我看到2.1.8中已修复这个读取问题!
datetime=DB_TZ.localize(self.datetime),
interval=d.interval.value
问题是不是出在这里 去常量那里增加一个“5min”?
Traceback (most recent call last):
File "D:/vnpy-2.1.7/examples/csv_to_mongodb/csv_to_mongodb.py", line 92, in <module>
move_df_to_mongodb(imported_data,品种代码)
File "D:/vnpy-2.1.7/examples/csv_to_mongodb/csv_to_mongodb.py", line 51, in move_df_to_mongodb
database_manager.save_bar_data(bars,collection_name)
File "D:\vnpy-2.1.7\vnpy\trader\database\database_mongo.py", line 342, in save_bar_data
symbol=d.symbol, interval=d.interval.value, datetime=d.datetime
AttributeError: 'str' object has no attribute 'value'
pandas的dataframe(从csv读取而来),在处理过程中,原来导入1min的是增加一列,这个写法运行无误
imported_data['interval'] = Interval.MINUTE
+
现在想导入5min,以下写法均报错
imported_data['interval'] = 5*Interval.MINUTE
imported_data['interval'] = “5min”
请假该如何写这里的代码?感谢!(读取还没试,因为没保存进去)
看了许久,没研究清楚下面这个代码中哪里订阅合约?
+
我看所有的示例策略中都没有 合约 这个参数,示例策略中也没有订阅过程,也就是策略和合约是完全分离的。。。疑惑
+
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
sleep(10)
cta_engine.init_engine()
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭子进程")
main_engine.close()
sys.exit(0)
尽量帮忙用代码指导,感激不尽!
+
需求:
合约交易时间:9:30-11:30及13:00-15:00
+
交易时间段内每分钟结束后的第5秒,触发一个事件。
+
上面这个需求不想通过on bar 里面的sleep5秒来实现(因为这样本质上还是bar事件逻辑,而非时间事件逻辑),我希望能实现策略能实现定时触发一个处理函数(时间事件逻辑)
+
感谢!!!!
大佬牛,有没有代码可以分享~
这样的写法挺好,注意resample只能重采样等分1天的分钟级别,不过对于函数里的base偏移理解的不是很到位,已经存在futurewarning,不过不影响使用
+
+
目前我只用到1分钟聚合5分钟,别的还没测试
`def QA_data_futuremin_resample20201205(
min_data,
type_='5min',
exchange_id=Exchange.CFFEX
):
"""期货分钟线采样成大周期
分钟线采样成子级别的分钟线
future:
vol ==> trade
amount X
期货一般两种模式:
中金所 股指期货: 9:30 - 11:30/ 13:00 -15:00
其他期货: -1 21:00: 2:30 / 9:00 - 11:30 / 13:30-15:00
输入demo
open high low close open_interest volume
datetime
2020-11-24 09:33:00+08:00 3465.9598 3466.3631 3463.9408 3465.0981 68138.0 588.0
2020-11-24 09:34:00+08:00 3465.5208 3470.3931 3464.5840 3469.7117 67968.0 676.0
2020-11-24 09:35:00+08:00 3469.9932 3470.6966 3466.8064 3468.5448 67851.0 427.0
2020-11-24 09:36:00+08:00 3468.4432 3470.1731 3463.9361 3464.2188 67684.0 431.0
2020-11-24 09:37:00+08:00 3464.0211 3464.3037 3456.2692 3457.5784 67481.0 532.0
2020-11-24 09:38:00+08:00 3457.8283 3457.8737 3451.8422 3453.3229 67163.0 603.0
2020-11-24 09:39:00+08:00 3453.7302 3456.1880 3453.2948 3455.7669 66960.0 358.0
2020-11-24 09:40:00+08:00 3455.5569 3455.6181 3450.4431 3450.4431 66642.0 568.0
"""
CONVERSION = {
# 'code': 'first',
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
# 'datetime': 'last',
'open_interest': 'last',
'volume': 'sum'
}
min_data = min_data.loc[:, list(CONVERSION.keys())]
idx = min_data.index
if exchange_id == Exchange.CFFEX:
part_1 = min_data.iloc[idx.indexer_between_time('9:30', '11:30')]
part_1_res = part_1.resample(
type_,
base=30,
closed='right',
loffset=type_
).apply(CONVERSION)
# part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:00')]
part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:15')]#为了适配中金所国债期货交易时间20201205
part_2_res = part_2.resample(
type_,
base=0,
closed='right',
loffset=type_
).agg(CONVERSION)
return pd.concat(
[part_1_res,
part_2_res]
# ).dropna().sort_index().reset_index().set_index(['datetime','code'])
).dropna().sort_index()
else:
part_1 = min_data.iloc[np.append(
idx.indexer_between_time('0:00',
'11:30'),
idx.indexer_between_time('0:00',
'11:30')
)]
part_1_res = part_1.resample(
type_,
base=0,
closed='right',
loffset=type_
).apply(CONVERSION)
part_2 = min_data.iloc[idx.indexer_between_time('13:30', '15:00')]
part_2_res = part_2.resample(
type_,
base=30,
closed='right',
loffset=type_
).agg(CONVERSION)
part_3 = min_data.iloc[idx.indexer_between_time('21:00', '23:59')]
part_3_res = part_3.resample(
type_,
base=0,
closed='right',
loffset=type_
).agg(CONVERSION)
return pd.concat(
[part_1_res,
part_2_res,
part_3_res]
# ).dropna().sort_index().reset_index().set_index(['datetime','code'])
).dropna().sort_index()`
用Python的交易员 wrote:
基本正常吧,MongoDB差不多就这个性能了
+
+
+
+
大佬,请教一下,哪个数据库更快呢
大佬牛!
改成datetime=DB_TZ.localize(self.datetime)
已解决问题!
从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,请教一下各位大佬这是不是MongoDB的性能问题,如果想减少这个读取时间,就要换其他数据库?有什么推荐么?
备注:MongoDB数据已经分表储存。
多问一下:从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,这是不是MongoDB的性能问题,如果想提升这个读取时间,就要换数据库?
备注:MongoDB数据已经分表储存。
https://www.cnblogs.com/foxracle/p/3258034.html
也许这是MongoDB的问题