VeighNa量化社区
你的开源社区量化交易平台
shunyuzhiqian's Avatar
Member
离线
70 帖子
声望: 0

最近发现quantaxis的1分钟数据7万条数据耗时0:00:00.001997
+
但是vnpy的database_manager.load_bar_data,提取7万条数据要10秒钟,所以想着手改造一下,
+
但是我发现quantaxis用的是pymongo的collection.insert_many()方法存数据,collection.find()方法读取数据,在vnpy的数据库源代码却找不到相关的方法,所以想问下各位大佬vnpy的MongoDB连接是不是基于pymongo写的?有没有可能优化后达到quantaxis的提取速度?
+

    def load_bar_data(
            self,
            symbol: str,
            exchange: Exchange,
            interval: Interval,
            start: datetime,
            end: datetime,
            collection_name: str = None,
    ) -> Sequence[BarData]:
        if collection_name is None:
            s = DbBarData.objects(
                symbol=symbol,
                exchange=exchange.value,
                interval=interval.value,
                datetime__gte=start,
                datetime__lte=end,
            )
        else:
            with switch_collection(DbBarData, collection_name):
                s = DbBarData.objects(
                    symbol=symbol,
                    exchange=exchange.value,
                    interval=interval.value,
                    datetime__gte=start,
                    datetime__lte=end,
                )
        data = [db_bar.to_bar() for db_bar in s]
        return data

vnpy代码上面没有pymongo的影子。

大佬们,我是装了发行版2.1.7在C盘,同时我在D盘搞了个源代码文件夹vnpy2.1.7(里面也有vnpy文件夹),pycharm的解释器路径设置的是c盘vnpy的发行版安装的那个python.exe。
+
+
现在我在D盘的vnpy2.1.7文件夹,把里面的vnpy文件夹代码改了很多(包括MongoDB分表储存、成交额因子等等记不清了)
+
我在D盘的vnpy2.1.7文件夹下写的代码

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG

上面这些代码引用的是C盘的发行版里面的vnpy库,还是D盘的VNPY2.1.7文件夹下面的vnpy文件件代码?
不太理解这个引用机制,请教下各位大佬!

我看很多地方离有注册如下事件:

    def register_event(self):
        """"""
        # self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)

我理解这里是注册了一个EVENT_CONTRACT事件对应的处理函数,但是我想搞清楚EVENT_CONTRACT是哪个模块的哪个代码文件推出来的?

pandas的dataframe(从csv读取而来),在处理过程中,原来导入1min的是增加一列,这个写法运行无误

 imported_data['interval'] = Interval.MINUTE

+
现在想导入5min,以下写法均报错

imported_data['interval'] = 5*Interval.MINUTE
imported_data['interval'] = “5min”

请假该如何写这里的代码?感谢!(读取还没试,因为没保存进去)

看了许久,没研究清楚下面这个代码中哪里订阅合约?
+
我看所有的示例策略中都没有 合约 这个参数,示例策略中也没有订阅过程,也就是策略和合约是完全分离的。。。疑惑
+

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)

    cta_engine.init_engine()
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()
    main_engine.write_log("CTA策略全部启动")

    while True:
        sleep(10)

        trading = check_trading_period()
        if not trading:
            print("关闭子进程")
            main_engine.close()
            sys.exit(0)

尽量帮忙用代码指导,感激不尽!
+
需求:
合约交易时间:9:30-11:30及13:00-15:00
+
交易时间段内每分钟结束后的第5秒,触发一个事件。
+
上面这个需求不想通过on bar 里面的sleep5秒来实现(因为这样本质上还是bar事件逻辑,而非时间事件逻辑),我希望能实现策略能实现定时触发一个处理函数(时间事件逻辑)
+
感谢!!!!

`def QA_data_futuremin_resample20201205(
        min_data,
        type_='5min',
        exchange_id=Exchange.CFFEX
):
    """期货分钟线采样成大周期
    分钟线采样成子级别的分钟线
    future:
    vol ==> trade
    amount X
    期货一般两种模式:
    中金所 股指期货: 9:30 - 11:30/ 13:00 -15:00
    其他期货: -1 21:00: 2:30  /  9:00 - 11:30 / 13:30-15:00


    输入demo
                open       high        low      close  open_interest  volume
    datetime
    2020-11-24 09:33:00+08:00  3465.9598  3466.3631  3463.9408  3465.0981        68138.0   588.0
    2020-11-24 09:34:00+08:00  3465.5208  3470.3931  3464.5840  3469.7117        67968.0   676.0
    2020-11-24 09:35:00+08:00  3469.9932  3470.6966  3466.8064  3468.5448        67851.0   427.0
    2020-11-24 09:36:00+08:00  3468.4432  3470.1731  3463.9361  3464.2188        67684.0   431.0
    2020-11-24 09:37:00+08:00  3464.0211  3464.3037  3456.2692  3457.5784        67481.0   532.0
    2020-11-24 09:38:00+08:00  3457.8283  3457.8737  3451.8422  3453.3229        67163.0   603.0
    2020-11-24 09:39:00+08:00  3453.7302  3456.1880  3453.2948  3455.7669        66960.0   358.0
    2020-11-24 09:40:00+08:00  3455.5569  3455.6181  3450.4431  3450.4431        66642.0   568.0

    """
    CONVERSION = {
        # 'code': 'first',
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        # 'datetime': 'last',
        'open_interest': 'last',
        'volume': 'sum'
    }
    min_data = min_data.loc[:, list(CONVERSION.keys())]
    idx = min_data.index
    if exchange_id == Exchange.CFFEX:
        part_1 = min_data.iloc[idx.indexer_between_time('9:30', '11:30')]
        part_1_res = part_1.resample(
            type_,
            base=30,
            closed='right',
            loffset=type_
        ).apply(CONVERSION)
        # part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:00')]
        part_2 = min_data.iloc[idx.indexer_between_time('13:00', '15:15')]#为了适配中金所国债期货交易时间20201205
        part_2_res = part_2.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        return pd.concat(
            [part_1_res,
             part_2_res]
        # ).dropna().sort_index().reset_index().set_index(['datetime','code'])
        ).dropna().sort_index()
    else:
        part_1 = min_data.iloc[np.append(
            idx.indexer_between_time('0:00',
                                     '11:30'),
            idx.indexer_between_time('0:00',
                                     '11:30')
        )]
        part_1_res = part_1.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).apply(CONVERSION)
        part_2 = min_data.iloc[idx.indexer_between_time('13:30', '15:00')]
        part_2_res = part_2.resample(
            type_,
            base=30,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        part_3 = min_data.iloc[idx.indexer_between_time('21:00', '23:59')]
        part_3_res = part_3.resample(
            type_,
            base=0,
            closed='right',
            loffset=type_
        ).agg(CONVERSION)
        return pd.concat(
            [part_1_res,
             part_2_res,
             part_3_res]
        # ).dropna().sort_index().reset_index().set_index(['datetime','code'])
        ).dropna().sort_index()`

从MongoDB取数据出来 1年的1分钟bardata近6万条,取出来的时间大约10秒钟,请教一下各位大佬这是不是MongoDB的性能问题,如果想减少这个读取时间,就要换其他数据库?有什么推荐么?

备注:MongoDB数据已经分表储存。

1++++
首先在回测代码中通过引擎属性赋值参数如下:

#%%
engine = BacktestingEngine()
engine.set_parameters(

    vt_symbol="IH99.CFFEX",
    interval="1m",
    start=datetime(2019, 6, 1),
    end=datetime(2019, 6, 30),
    rate=0.3/10000,
    slippage=0.2*5,
    size=300,
    pricetick=0.2,
    capital=1_000_000,
    collection_name = "IH99"
)
engine.add_strategy(AtrRsiStrategy, {})

+
+
2+++++
AtrRsiStrategy策略中onbar如下:

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()

        df01=load_select_bar_data_from_mongodb(self.symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),
                                               bar.datetime,self.collection_name)
        print (df01)

        self.put_event()

注:load_select_bar_data_from_mongodb是自定义的从MongoDB数据库读取bardata并生成dataframe的函数,有关于symbol等参数需要传入。
+
+
+
3++++++++++
回测运行报错如下:

2020-11-30 22:50:43.167187  Traceback (most recent call last):
  File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\backtesting.py", line 315, in run_backtesting
    self.callback(data)
  File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py", line 129, in on_bar
    df01=load_select_bar_data_from_mongodb(self.symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),
AttributeError: 'AtrRsiStrategy' object has no attribute 'symbol'

+
+
+
4++++++++++
所以想请教大佬们,如何让策略接受到上述回测引擎engine.set_parameters所设置的属性?

变通解决方法:将bardata时间偏离设置为7小时54分。即utc_86 = timezone(timedelta(hours=7,minutes=54)),这样读取出来的数据分钟就是正常的了
csv导入MongoDB代码如下:

from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
from datetime import datetime, timedelta, timezone

# 中国时区是+8,对应参数hours=8
# utc_8 = timezone(timedelta(hours=8))
utc_86 = timezone(timedelta(hours=7,minutes=54))#变通
# datetime=row.datetime.replace(tzinfo=utc_8)

import pytz
tz = pytz.timezone('Asia/Shanghai')
print('tz=',tz)

# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():

        bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              # datetime=tz.localize(row.datetime),
              datetime=row.datetime.replace(tzinfo=utc_86),
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )

        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
            print ('start=',start)
    end = bar.datetime

    # insert into database
    database_manager.save_bar_data(bars,collection_name)
    print(f'Insert Bar: {count} from {start} - {end}')


if __name__ == "__main__":

    # 读取需要入库的csv文件,该文件是用gbk编码
    imported_data = pd.read_csv('IH99_20101127_20201127_2.csv',encoding='gbk')
    # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
    imported_data['exchange'] = Exchange.CFFEX
    # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
    imported_data['interval'] = Interval.MINUTE
    # 明确需要是float数据类型的列
    float_columns = ['open','high','low','close','volume','open_interest']
    for col in float_columns:
      imported_data[col] = imported_data[col].astype('float')
    # 明确时间戳的格式
    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y%m%d %H:%M:%S'
    imported_data['datetime'] = pd.to_datetime(imported_data['datetime'],format=datetime_format)

    品种代码='IH9902'
    imported_data['symbol'] = 品种代码
    # 因为没有用到 成交额 这一列的数据,所以该列列名不变
    # imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
    # imported_data = imported_data.rename(index=str,
    #                            columns={"时间": "datetime",
                                        # "KQ.i@CFFEX.T.high": "high",
                                        # "KQ.i@CFFEX.T.low": "low",
                                        # "KQ.i@CFFEX.T.close": "close",
                                        # "KQ.i@CFFEX.T.volume": "volume",
                                        # "KQ.i@CFFEX.T.close_oi": "open_interest",
                                        # })
    # 筛选展示的列名
    # imported_data = imported_data[["datetime","open", "high", "low", "close", "open_interest", "volume"]]
    print('//',imported_data.head(1).append(imported_data.tail(1)),"//")


    move_df_to_mongodb(imported_data,品种代码)

+
+
+
+

description

不知道图能否看清

就是我数据库里没有IH99 9:25的数据,但是通过datamanager方法读取来的数据时间戳有9:25,应该是9:31才对

大佬们
我看cta的例子里面是在def on_init(self):里面self.load_bar(10) ,即获取十天的bar数据

我现想在def on_bar(self, bar: BarData):函数中,向前取2个月的数据(1分钟bardata),这时候我应该用哪个函数api来获取呢?
并同时将这两个月的1分钟bardata转为dataframe形式使用,请教一下各位大佬!

BarGenerator模块是以tick数据驱动,如果一个合约长时间没有tick数据过来将导致分钟数据缺失,应该如何解决这个问题呢,要求是实时性(分钟级别),不是盘后解决哈。
请教各位大佬如何修改BarGenerator或者通过其他途径解决呢!

def run(self):
    """"""
    while self.active:
        try:
            task = self.queue.get(timeout=1)
            task_type, data = task

代码来源:vnpy/app/data_recorder/engine.py

我想请教下大佬这里关于的多线程的问题

1、在get一个task后,完成一个task后面的代码,才会取出queue中的下一个task
2、无论当前取走的task是否完成,都会在timeout=1后继续取task出来进行处理

想问下我哪个理解是正确的?或者有其他指点请赐教,感谢大佬!

def register_event(self):
    """"""
    self.event_engine.register(EVENT_TICK, self.process_tick_event)
    self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
    self.event_engine.register(EVENT_SPREAD_DATA, self.process_spread_event)

代码来源vnpy/app/data_recorder/engine.py

请教下EVENT_SPREAD_DATA这是什么数据引发的什么事件?

vnpy的1分钟行情记录是把当前tick的时间当做K线的起始时间的, 比如我要记录期货合约9:00到11:30的行情, 实际写入到数据库的行情的时间是9:00, 9:01, 9:02......11:29, 米筐数据是9.01,9.02,9.03,...11:30

请问如何修改BarGenerator源码能使其生成bar数据的时间戳同米筐数据保持一致呢,感谢!

下面是BarGenerator源码:

class BarGenerator:
"""
For:

1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data

Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""

def __init__(
    self,
    on_bar: Callable,
    window: int = 0,
    on_window_bar: Callable = None,
    interval: Interval = Interval.MINUTE
):
    """Constructor"""
    self.bar: BarData = None
    self.on_bar: Callable = on_bar

    self.interval: Interval = interval
    self.interval_count: int = 0

    self.window: int = window
    self.window_bar: BarData = None
    self.on_window_bar: Callable = on_window_bar

    self.last_tick: TickData = None
    self.last_bar: BarData = None

def update_tick(self, tick: TickData) -> None:
    """
    Update new tick data into generator.
    """
    new_minute = False

    # Filter tick data with 0 last price
    if not tick.last_price:
        return

    # Filter tick data with less intraday trading volume (i.e. older timestamp)
    if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
        return

    if not self.bar:
        new_minute = True
    elif(self.bar.datetime.minute != tick.datetime.minute) or (self.bar.datetime.hour != tick.datetime.hour):
        self.bar.datetime = self.bar.datetime.replace(
            second=0, microsecond=0
        )
        self.on_bar(self.bar)

        new_minute = True

    if new_minute:
        self.bar = BarData(
            symbol=tick.symbol,
            exchange=tick.exchange,
            interval=Interval.MINUTE,
            datetime=tick.datetime,
            gateway_name=tick.gateway_name,
            open_price=tick.last_price,
            high_price=tick.last_price,
            low_price=tick.last_price,
            close_price=tick.last_price,
            open_interest=tick.open_interest
        )
    else:
        self.bar.high_price = max(self.bar.high_price, tick.last_price)
        self.bar.low_price = min(self.bar.low_price, tick.last_price)
        self.bar.close_price = tick.last_price
        self.bar.open_interest = tick.open_interest
        self.bar.datetime = tick.datetime

    if self.last_tick:
        volume_change = tick.volume - self.last_tick.volume
        self.bar.volume += max(volume_change, 0)

    self.last_tick = tick

def update_bar(self, bar: BarData) -> None:
    """
    Update 1 minute bar into generator
    """
    # If not inited, creaate window bar object
    if not self.window_bar:
        # Generate timestamp for bar data
        if self.interval == Interval.MINUTE:
            dt = bar.datetime.replace(second=0, microsecond=0)
        else:
            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

        self.window_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price
        )
    # Otherwise, update high/low price into window bar
    else:
        self.window_bar.high_price = max(
            self.window_bar.high_price, bar.high_price)
        self.window_bar.low_price = min(
            self.window_bar.low_price, bar.low_price)

    # Update close price/volume into window bar
    self.window_bar.close_price = bar.close_price
    self.window_bar.volume += int(bar.volume)
    self.window_bar.open_interest = bar.open_interest

    # Check if window bar completed
    finished = False

    if self.interval == Interval.MINUTE:
        # x-minute bar
        if not (bar.datetime.minute + 1) % self.window:
            finished = True
    elif self.interval == Interval.HOUR:
        if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
            # 1-hour bar
            if self.window == 1:
                finished = True
            # x-hour bar
            else:
                self.interval_count += 1

                if not self.interval_count % self.window:
                    finished = True
                    self.interval_count = 0

    if finished:
        self.on_window_bar(self.window_bar)
        self.window_bar = None

    # Cache last bar object
    self.last_bar = bar

如题我看其他视频博主 vnpy中全局配置 有数据库相关的配置,目前我在使用中的全局配置和视频博主里的不一样

我的:

description

视频博主的:

description

大佬们,我看vnpy项目库里没有单独开进程进行行情记录的demo code(论坛里好像有小伙伴说有。。),有没有大佬分享下,感谢!!

电脑win10 64位,之前的经验贴已阅读,但是这个错误前面的人没遇到

运行python -m vnstation
返回:
(VN Studio) C:\vnstudio>python -m vnstation
Traceback (most recent call last):
File "C:\vnstudio\lib\runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "C:\vnstudio\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\vnstudio\lib\site-packages\vnstation__main.py", line 4, in <module>
cli()
File "C:\vnstudio\lib\site-packages\click\core.py", line 829, in
call
return self.main(args, kwargs)
File "C:\vnstudio\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1236, in invoke
return Command.invoke(self, ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback,
ctx.params)
File "C:\vnstudio\lib\site-packages\click\core.py", line 610, in invoke
return callback(
args, kwargs)
File "C:\vnstudio\lib\site-packages\click\decorators.py", line 21, in new_func
return f(get_current_context(), *args,
kwargs)
File "C:\vnstudio\lib\site-packages\vnstation\cli.py", line 15, in cli
run()
File "C:\vnstudio\lib\site-packages\click\core.py", line 829, in
call
return self.main(args, kwargs)
File "C:\vnstudio\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "C:\vnstudio\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback,
ctx.params)
File "C:\vnstudio\lib\site-packages\click\core.py", line 610, in invoke
return callback(
args, **kwargs)
File "C:\vnstudio\lib\site-packages\vnstation\cli.py", line 20, in run
import vnstation.run
File "C:\vnstudio\lib\site-packages\vnstation\run.py", line 4, in <module>
from vnpy.trader.ui import QtGui, create_qapp
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\
init__.py", line 11, in <module>
from .mainwindow import MainWindow
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 14, in <module>
from .widget import (
File "C:\vnstudio\lib\site-packages\vnpy\trader\ui\widget.py", line 19, in <module>
from ..engine import MainEngine
File "C:\vnstudio\lib\site-packages\vnpy\trader\engine.py", line 42, in <module>
from .setting import SETTINGS
File "C:\vnstudio\lib\site-packages\vnpy\trader\setting.py", line 30, in <module>
"database.timezone": get_localzone().zone,
File "C:\vnstudio\lib\site-packages\tzlocal\win32.py", line 93, in get_localzone
_cache_tz = pytz.timezone(get_localzone_name())
File "C:\vnstudio\lib\site-packages\tzlocal\win32.py", line 84, in get_localzone_name
raise pytz.UnknownTimeZoneError('Can not find timezone ' + tzkeyname)

pytz.exceptions.UnknownTimeZoneError: 'Can not find timezone '

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】