VeighNa量化社区
你的开源社区量化交易平台
shunyuzhiqian's Avatar
Member
离线
70 帖子
声望: 0

description

相当于整体往前移动了6分钟,很费解,读取出来的每天都是往前移6分钟

gateway_name symbol        exchange                  datetime         interval  volume  open_interest       open       high        low      close

0 DB IH99 Exchange.CFFEX 2019-05-31 09:25:00+08:00 Interval.MINUTE 767.0 54422.0 2713.5703 2716.4436 2709.8341 2712.8189
1 DB IH99 Exchange.CFFEX 2019-05-31 09:26:00+08:00 Interval.MINUTE 359.0 54205.0 2712.7911 2716.2056 2712.6688 2713.1206
2 DB IH99 Exchange.CFFEX 2019-05-31 09:27:00+08:00 Interval.MINUTE 323.0 53981.0 2713.1068 2713.5549 2709.9648 2709.9710
3 DB IH99 Exchange.CFFEX 2019-05-31 09:28:00+08:00 Interval.MINUTE 422.0 53717.0 2710.1046 2712.3135 2708.4665 2710.7099
4 DB IH99 Exchange.CFFEX 2019-05-31 09:29:00+08:00 Interval.MINUTE 291.0 53536.0 2711.1412 2714.8780 2710.6063 2713.2508
5 DB IH99 Exchange.CFFEX 2019-05-31 09:30:00+08:00 Interval.MINUTE 626.0 53167.0 2713.3754 2714.0023 2709.8716 2711.3979
6 DB IH99 Exchange.CFFEX 2019-05-31 09:31:00+08:00 Interval.MINUTE 354.0 52976.0 2711.2408 2713.0316 2709.2843 2712.6573
7 DB IH99 Exchange.CFFEX 2019-05-31 09:32:00+08:00 Interval.MINUTE 416.0 52736.0 2712.7938 2717.9527 2711.2315 2717.3653
8 DB IH99 Exchange.CFFEX 2019-05-31 09:33:00+08:00 Interval.MINUTE 506.0 52480.0 2717.7916 2721.5836 2715.4050 2720.7167
9 DB IH99 Exchange.CFFEX 2019-05-31 09:34:00+08:00 Interval.MINUTE 519.0 52240.0 2720.8500 2725.9267 2719.0185 2724.5785

description

不知道图能否看清

就是我数据库里没有IH99 9:25的数据,但是通过datamanager方法读取来的数据时间戳有9:25,应该是9:31才对

2020-11-29 23:26:59.647871 开始加载历史数据
2020-11-29 23:27:00.047876 加载进度:########## [100%]
2020-11-29 23:27:00.047876 历史数据加载完成,数据量:4560
2020-11-29 23:27:00.047876 触发异常,回测终止
2020-11-29 23:27:00.051934 Traceback (most recent call last):
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\backtesting.py", line 344, in run_backtesting
self.callback(data)
File "D:\vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py", line 126, in on_bar
self.load_select_bar_data_from_mongodb(self.vt_symbol,self.exchange,self.interval,(bar.datetime-timedelta(days=3)),bar.datetime,self.collection_name)
AttributeError: 'AtrRsiStrategy' object has no attribute 'exchange'

这个代码写的还不够严禁 部分属性有问题

最近问题比较多,主要就是怕自己实现的方法走弯路,低效,

所以才想跟大佬们请教一下是否是简洁有效的写法,不浪费性能,感谢

关于在策略里读取历史数据并生成dataframe的方法我写了一个初稿,请各位大佬斧正

    def load_select_bar_data_from_mongodb(self,symbol: str,
                                          exchange: Exchange,
                                          interval: Interval,
                                          start: datetime,
                                          end: datetime,
                                          collection_name: str = None):


        df = pd.DataFrame(database_manager.load_bar_data(symbol, exchange, interval, start, end, collection_name))
        df = df.rename(index=str, columns={"open_price": "open",
                                           "high_price": "high",
                                           "low_price": "low",
                                           "close_price": "close",
                                           })
        data_select_columns = ["open", "high", "low", "close", "open_interest", "volume"]
        df = df.set_index('datetime')[data_select_columns]
        print(df)

        return df

我是准备在on_bar触发的时候调用这个函数取得想要的起止日期的dataframe

有如下两方面具体问题请请教大佬
1这个函数方法是否有必要写到比如vnpy-2.1.7\vnpy\app\cta_strategy\strategies\atr_rsi_strategy.py策略文件中 ,并设置为一个策略类的属性方法?还是说我放在类外面独立为一个单独的函数就够了?
2回测中反复从MongoDB中读取数据生成dataframe是否是一种低效的方式?有办法改进么,在同实盘保持接轨的思路下(策略是1分钟级别)?

大佬们
我看cta的例子里面是在def on_init(self):里面self.load_bar(10) ,即获取十天的bar数据

我现想在def on_bar(self, bar: BarData):函数中,向前取2个月的数据(1分钟bardata),这时候我应该用哪个函数api来获取呢?
并同时将这两个月的1分钟bardata转为dataframe形式使用,请教一下各位大佬!

用Python的交易员 wrote:

BarGenerator本身是tick驱动的,如果没有tick就合成不了了。如果硬要加上定时检查,需要额外增加定时检查的逻辑(监听EVENT_TIMER)事件,然后每秒检查当前时间和缓存K线时间戳的偏离度,超过一个水平就全部合成K线。

好的好的 谢谢大佬指导!

CTA回测引擎改造部分,我看楼主只改造了backtesting.py文件,该目录里面的engine.py里面也有loadbar,如下

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

请教一下,这个引擎engine.py要改么?

callingpulse wrote:

2020/09/09更新
原来代码

bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,#报错的地方,需要增加时区信息
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )

运行会报错:

AttributeError: 'str' object has no attribute 'astimezone'

原因是新版本vnpy支持了时区数据,所以datetime=row.datetime需要加上时区信息

from datetime import datetime, timedelta, timezone

# 中国时区是+8,对应参数hours=8
# 日本时区是+9,hours=9
utc_8 = timezone(timedelta(hours=8))
datetime=row.datetime.replace(tzinfo=utc_8)

一个小技巧,把bar里面的数据打印出来,看看都是什么格式。

sql_manager.get_oldest_bar_data()

大佬牛!!!出错后发现真是这个问题导致的,必须加时区

BarGenerator模块是以tick数据驱动,如果一个合约长时间没有tick数据过来将导致分钟数据缺失,应该如何解决这个问题呢,要求是实时性(分钟级别),不是盘后解决哈。
请教各位大佬如何修改BarGenerator或者通过其他途径解决呢!

wrote:

log内的时间没解释清楚,以第一根bar为例:
2020-11-25 19:46:00: API_STABILITY_MONITOR: 379.240000, actual time 20:59:02.225194, num_bar = 556
19:46:00 --- 这个时bar内的datatime; 20:59:02.225194 这个是当前时间
从时间看这一根bar可以看作是非交易时间推送的fake_data,这个需要策略自己做过滤么?

而2020-11-25 20:59:00 和2020-11-25 20:59:00这两根bar实际获取时间分别是 21:00:02.015919和09:00:02.269108,都在交易时间之内

2020-11-25 20:59:00: API_STABILITY_MONITOR: 380.540000, actual time 21:00:02.015919, num_bar = 2
2020-11-25 21:00:00: API_STABILITY_MONITOR: 380.240000, actual time 21:01:00.911799, num_bar = 3

这两个是tick对么?这两个不用过滤吧 要保留

BELLCOUSIN wrote:

应该就是
在vnpy/gateway/ctp/ 增加以上文件: ctp_gateway_double.py
在vnpy/gateway/ctp/init.py 导入包

from .ctp_gateway_double import CtpGatewayDouble

在run.py里面:

from vnpy.gateway.ctp import CtpGateway
from vnpy.gateway.ctp import CtpGatewayDouble
main_engine.add_gateway(CtpGateway)
main_engine.add_gateway(CtpGatewayDouble)

在ctp_setting配置文件里面. 配置下ctp_setting增加下

 "次行情服务器":  "***.***.***.***:****",

为啥我会重复推tick...
不是用hash过滤了吗
请大佬解答下我哪里步骤错了.

我觉得有可能是self.tick_buffer的问题

def run(self):
    """"""
    while self.active:
        try:
            task = self.queue.get(timeout=1)
            task_type, data = task

代码来源:vnpy/app/data_recorder/engine.py

我想请教下大佬这里关于的多线程的问题

1、在get一个task后,完成一个task后面的代码,才会取出queue中的下一个task
2、无论当前取走的task是否完成,都会在timeout=1后继续取task出来进行处理

想问下我哪个理解是正确的?或者有其他指点请赐教,感谢大佬!

def register_event(self):
    """"""
    self.event_engine.register(EVENT_TICK, self.process_tick_event)
    self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
    self.event_engine.register(EVENT_SPREAD_DATA, self.process_spread_event)

代码来源vnpy/app/data_recorder/engine.py

请教下EVENT_SPREAD_DATA这是什么数据引发的什么事件?

vnpy的1分钟行情记录是把当前tick的时间当做K线的起始时间的, 比如我要记录期货合约9:00到11:30的行情, 实际写入到数据库的行情的时间是9:00, 9:01, 9:02......11:29, 米筐数据是9.01,9.02,9.03,...11:30

请问如何修改BarGenerator源码能使其生成bar数据的时间戳同米筐数据保持一致呢,感谢!

下面是BarGenerator源码:

class BarGenerator:
"""
For:

1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data

Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""

def __init__(
    self,
    on_bar: Callable,
    window: int = 0,
    on_window_bar: Callable = None,
    interval: Interval = Interval.MINUTE
):
    """Constructor"""
    self.bar: BarData = None
    self.on_bar: Callable = on_bar

    self.interval: Interval = interval
    self.interval_count: int = 0

    self.window: int = window
    self.window_bar: BarData = None
    self.on_window_bar: Callable = on_window_bar

    self.last_tick: TickData = None
    self.last_bar: BarData = None

def update_tick(self, tick: TickData) -> None:
    """
    Update new tick data into generator.
    """
    new_minute = False

    # Filter tick data with 0 last price
    if not tick.last_price:
        return

    # Filter tick data with less intraday trading volume (i.e. older timestamp)
    if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
        return

    if not self.bar:
        new_minute = True
    elif(self.bar.datetime.minute != tick.datetime.minute) or (self.bar.datetime.hour != tick.datetime.hour):
        self.bar.datetime = self.bar.datetime.replace(
            second=0, microsecond=0
        )
        self.on_bar(self.bar)

        new_minute = True

    if new_minute:
        self.bar = BarData(
            symbol=tick.symbol,
            exchange=tick.exchange,
            interval=Interval.MINUTE,
            datetime=tick.datetime,
            gateway_name=tick.gateway_name,
            open_price=tick.last_price,
            high_price=tick.last_price,
            low_price=tick.last_price,
            close_price=tick.last_price,
            open_interest=tick.open_interest
        )
    else:
        self.bar.high_price = max(self.bar.high_price, tick.last_price)
        self.bar.low_price = min(self.bar.low_price, tick.last_price)
        self.bar.close_price = tick.last_price
        self.bar.open_interest = tick.open_interest
        self.bar.datetime = tick.datetime

    if self.last_tick:
        volume_change = tick.volume - self.last_tick.volume
        self.bar.volume += max(volume_change, 0)

    self.last_tick = tick

def update_bar(self, bar: BarData) -> None:
    """
    Update 1 minute bar into generator
    """
    # If not inited, creaate window bar object
    if not self.window_bar:
        # Generate timestamp for bar data
        if self.interval == Interval.MINUTE:
            dt = bar.datetime.replace(second=0, microsecond=0)
        else:
            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

        self.window_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price
        )
    # Otherwise, update high/low price into window bar
    else:
        self.window_bar.high_price = max(
            self.window_bar.high_price, bar.high_price)
        self.window_bar.low_price = min(
            self.window_bar.low_price, bar.low_price)

    # Update close price/volume into window bar
    self.window_bar.close_price = bar.close_price
    self.window_bar.volume += int(bar.volume)
    self.window_bar.open_interest = bar.open_interest

    # Check if window bar completed
    finished = False

    if self.interval == Interval.MINUTE:
        # x-minute bar
        if not (bar.datetime.minute + 1) % self.window:
            finished = True
    elif self.interval == Interval.HOUR:
        if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
            # 1-hour bar
            if self.window == 1:
                finished = True
            # x-hour bar
            else:
                self.interval_count += 1

                if not self.interval_count % self.window:
                    finished = True
                    self.interval_count = 0

    if finished:
        self.on_window_bar(self.window_bar)
        self.window_bar = None

    # Cache last bar object
    self.last_bar = bar

守望长城2020-6-11-艾瑞巴蒂 wrote:

个人觉得应该在执行完main_engine.connect(CTP_SETTING, "CTP")之后,在主线程中添加阻塞,以等待connect方法完全执行, 如果不阻塞一下主线程的话, 有一定的概率有部分eContract事件会先"跑掉",做不到在执行完来自oms的process_contract_event之后紧接着执行来自RecorderEngine的process_contract_event方法, 这是有风险的,最保险的做法是在主线程中增加sleep, 阻塞主线程一些时间.

问题初步解决 不能加sleep 加了就运行不了 还没搞清楚原因

用Python的交易员 wrote:

请向下滚动对话框,右边有个滚动条的

收到!自己没看好细节,谢谢大佬!

如题我看其他视频博主 vnpy中全局配置 有数据库相关的配置,目前我在使用中的全局配置和视频博主里的不一样

我的:

description

视频博主的:

description

我理解,这个行情录制代码里,新建了一个数据记录类WholeMarketRecorder,在子进程中实例了一次(但是实例的那些方法何时调用的 不清楚。。)

运行这段代码后输出如上,然后后续一直没反应,数据库中也没有数据

同时我在WholeMarketRecorder的process_contract_event方法中 写了打印合约名称,也没运行出来(说明没运行这段代码)

大佬们有空的话 方便指导下么

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】