VeighNa量化社区
你的开源社区量化交易平台
shunyuzhiqian's Avatar
Member
离线
70 帖子
声望: 0

最近发现quantaxis的1分钟数据7万条数据耗时0:00:00.001997
+
但是vnpy的database_manager.load_bar_data,提取7万条数据要10秒钟,所以想着手改造一下,
+
但是我发现quantaxis用的是pymongo的collection.insert_many()方法存数据,collection.find()方法读取数据,在vnpy的数据库源代码却找不到相关的方法,所以想问下各位大佬vnpy的MongoDB连接是不是基于pymongo写的?有没有可能优化后达到quantaxis的提取速度?
+

    def load_bar_data(
            self,
            symbol: str,
            exchange: Exchange,
            interval: Interval,
            start: datetime,
            end: datetime,
            collection_name: str = None,
    ) -> Sequence[BarData]:
        if collection_name is None:
            s = DbBarData.objects(
                symbol=symbol,
                exchange=exchange.value,
                interval=interval.value,
                datetime__gte=start,
                datetime__lte=end,
            )
        else:
            with switch_collection(DbBarData, collection_name):
                s = DbBarData.objects(
                    symbol=symbol,
                    exchange=exchange.value,
                    interval=interval.value,
                    datetime__gte=start,
                    datetime__lte=end,
                )
        data = [db_bar.to_bar() for db_bar in s]
        return data

vnpy代码上面没有pymongo的影子。

puff wrote:

注册timer事件,或者on_tick里面数tick根数。
+
+
puff wrote:
注册timer事件,或者on_tick里面数tick根数。
+
+
道理懂,,但是不会写,,能不能帮忙写一个每秒钟打印当前时间的demo,感谢!

大佬们,我是装了发行版2.1.7在C盘,同时我在D盘搞了个源代码文件夹vnpy2.1.7(里面也有vnpy文件夹),pycharm的解释器路径设置的是c盘vnpy的发行版安装的那个python.exe。
+
+
现在我在D盘的vnpy2.1.7文件夹,把里面的vnpy文件夹代码改了很多(包括MongoDB分表储存、成交额因子等等记不清了)
+
我在D盘的vnpy2.1.7文件夹下写的代码

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG

上面这些代码引用的是C盘的发行版里面的vnpy库,还是D盘的VNPY2.1.7文件夹下面的vnpy文件件代码?
不太理解这个引用机制,请教下各位大佬!

搞清楚了。。是ctp-gatway生成的

我看很多地方离有注册如下事件:

    def register_event(self):
        """"""
        # self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)

我理解这里是注册了一个EVENT_CONTRACT事件对应的处理函数,但是我想搞清楚EVENT_CONTRACT是哪个模块的哪个代码文件推出来的?

上弦之月 wrote:

@大王 我回测用redis序列化数据,不用hdf5
+
月神,请教一下hdf5持久化和json方式保存的区别是什么?

老大 不好意思 才看到

详见
https://www.vnpy.com/forum/topic/5155-datamanagermo-kuai-du-qu-mongodbdu-dao-liao-shu-ju-ku-li-suo-mei-you-de-shi-jian-chuo?page=2#pid18228

的22楼

xiaohe:是database_mongo.py里to_bar函数对datetime的处理导致的,改成datetime=DB_TZ.localize(self.datetime)就可以了。这个问题会在下个版本修复的。
+
我看到2.1.8中已修复这个读取问题!

datetime=DB_TZ.localize(self.datetime),

interval=d.interval.value

问题是不是出在这里 去常量那里增加一个“5min”?

Traceback (most recent call last): File "D:/vnpy-2.1.7/examples/csv_to_mongodb/csv_to_mongodb.py", line 92, in <module> move_df_to_mongodb(imported_data,品种代码) File "D:/vnpy-2.1.7/examples/csv_to_mongodb/csv_to_mongodb.py", line 51, in move_df_to_mongodb database_manager.save_bar_data(bars,collection_name) File "D:\vnpy-2.1.7\vnpy\trader\database\database_mongo.py", line 342, in save_bar_data symbol=d.symbol, interval=d.interval.value, datetime=d.datetime AttributeError: 'str' object has no attribute 'value'

pandas的dataframe(从csv读取而来),在处理过程中,原来导入1min的是增加一列,这个写法运行无误

 imported_data['interval'] = Interval.MINUTE

+
现在想导入5min,以下写法均报错

imported_data['interval'] = 5*Interval.MINUTE
imported_data['interval'] = “5min”

请假该如何写这里的代码?感谢!(读取还没试,因为没保存进去)

看了许久,没研究清楚下面这个代码中哪里订阅合约?
+
我看所有的示例策略中都没有 合约 这个参数,示例策略中也没有订阅过程,也就是策略和合约是完全分离的。。。疑惑
+

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)

    cta_engine.init_engine()
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()
    main_engine.write_log("CTA策略全部启动")

    while True:
        sleep(10)

        trading = check_trading_period()
        if not trading:
            print("关闭子进程")
            main_engine.close()
            sys.exit(0)

尽量帮忙用代码指导,感激不尽!
+
需求:
合约交易时间:9:30-11:30及13:00-15:00
+
交易时间段内每分钟结束后的第5秒,触发一个事件。
+
上面这个需求不想通过on bar 里面的sleep5秒来实现(因为这样本质上还是bar事件逻辑,而非时间事件逻辑),我希望能实现策略能实现定时触发一个处理函数(时间事件逻辑)
+
感谢!!!!

大佬牛,有没有代码可以分享~

这样的写法挺好,注意resample只能重采样等分1天的分钟级别,不过对于函数里的base偏移理解的不是很到位,已经存在futurewarning,不过不影响使用
+
+
目前我只用到1分钟聚合5分钟,别的还没测试

`def QA_data_futuremin_resample20201205(
        min_data,
        type_='5min',
        exchange_id=Exchange.CFFEX
):
    """期货分钟线采样成大周期
    分钟线采样成子级别的分钟线
    future:
    vol ==> trade
    amount X
    期货一般两种模式:
    中金所 股指期货: 9:30 - 11:30/ 13:00 -15:00
    其他期货: -1 21:00: 2:30  /  9:00 - 11:30 / 13:30-15:00


    输入demo
                open       high        low      close  open_interest  volume
    datetime
    2020-11-24 09:33:00+08:00  3465.9598  3466.3631  3463.9408  3465.0981        68138.0   588.0
    2020-11-24 09:34:00+08:00  3465.5208  3470.3931  3464.5840  3469.7117        67968.0   676.0
    2020-11-24 09:35:00+08:00  3469.9932  3470.6966  3466.8064  3468.5448        67851.0   427.0
    2020-11-24 09:36:00+08:00  3468.4432  3470.1731  3463.9361  3464.2188        67684.0   431.0
    2020-11-24 09:37:00+08:00  3464.0211  3464.3037  3456.2692  3457.5784        67481.0   532.0
    2020-11-24 09:38:00+08:00  3457.8283  3457.8737  3451.8422  3453.3229        67163.0   603.0
    2020-11-24 09:39:00+08:00  3453.7302  3456.1880  3453.2948  3455.7669        66960.0   358.0
    2020-11-24 09:40:00+08:00  3455.5569  3455.6181  3450.4431  3450.4431        66642.0   568.0

    """
    CONVERSION = {
        # 'code': 'first',
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        # 'datetime': 'last',
        'open_interest': 'last',
        'volume': 'sum'
    }
    min_data = min_data.loc[:, list(CONVERSION.keys())]
    idx = min_data.index
    if exchange_id == Exchange.CFFEX:
        part_1 = min_data.iloc[idx.indexer_between_time('9:30', '11:30')]
        part_1_res = part_1.resample(
            type_,
            base=30,
            closed='right',
            loffset=type_
        ).apply(CONVERSION)
        # part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:00')]
        part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:15')]#为了适配中金所国债期货交易时间20201205
        part_2_res = part_2.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        return pd.concat(
            [part_1_res,
             part_2_res]
        # ).dropna().sort_index().reset_index().set_index(['datetime','code'])
        ).dropna().sort_index()
    else:
        part_1 = min_data.iloc[np.append(
            idx.indexer_between_time('0:00',
                                     '11:30'),
            idx.indexer_between_time('0:00',
                                     '11:30')
        )]
        part_1_res = part_1.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).apply(CONVERSION)
        part_2 = min_data.iloc[idx.indexer_between_time('13:30', '15:00')]
        part_2_res = part_2.resample(
            type_,
            base=30,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        part_3 = min_data.iloc[idx.indexer_between_time('21:00', '23:59')]
        part_3_res = part_3.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        return pd.concat(
            [part_1_res,
             part_2_res,
             part_3_res]
        # ).dropna().sort_index().reset_index().set_index(['datetime','code'])
        ).dropna().sort_index()`

用Python的交易员 wrote:

基本正常吧,MongoDB差不多就这个性能了
+
+
+
+
大佬,请教一下,哪个数据库更快呢

大佬牛!

改成datetime=DB_TZ.localize(self.datetime)

已解决问题!

从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,请教一下各位大佬这是不是MongoDB的性能问题,如果想减少这个读取时间,就要换其他数据库?有什么推荐么?

备注:MongoDB数据已经分表储存。

多问一下:从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,这是不是MongoDB的性能问题,如果想提升这个读取时间,就要换数据库?

备注:MongoDB数据已经分表储存。

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】