VeighNa量化社区
你的开源社区量化交易平台
xzhangef's Avatar
Member
离线
8 帖子
声望: 4

jingsiaosing wrote:

单纯把集合竞价tick合并到后边的第一个bar,有个更简单的办法,在barGenerator的update_tick中,根据self.last_tick是否存在来初始化bar,若不存在的话,把bar的datetime直接加一分钟

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        if not self.last_tick:   # first tick defers to the next minute bar
            self.bar.datetime += timedelta(minutes=1)   
`

赞,简单有效。

jingsiaosing wrote:

这是因为早盘的时候会重复推夜盘的行情,可能是ctp自己的bug,最后一个tick的时间戳会发生改变,比如郑商所的品种MA,最后一个tick本来是2022-08-17 22:59:59, 早盘推送的时候会把这个时间戳错误的修改为2022-08-18 22:59:59,整整推后了一天,导致白天所有的tick都变成了无效tick。建议在底层接口处,或者策略的on_tick处做一个过滤,tick.updatetime和datetime.now()做一个比较,相差超过10分钟的tick直接过滤掉即可。

赞,这是个简洁的好办法。郑商所的推送总是很奇怪。

通达信导出的csv中添加一行表头:date,time,open,high,low,close,volume,open_interest,turnover
通达信的数据和vnpy的数据录制规则不同,所以中间对通达信数据做了两个处理:
1、时间减1分钟;
2、夜盘的日期减1天。

注:代码基于vnpy2.7版本

import csv
from datetime import datetime, timedelta
from pytz import timezone

from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import BaseDatabase, get_database
from vnpy.trader.object import BarData

# 封装函数
def load_tdx_csv_to_mongodb(file_path: str, exchange: str, symbol:str, interval:str):

    with open(file_path, "rt") as f:
        buf = [line.replace("\0", "") for line in f]

    reader = csv.DictReader(buf, delimiter=",")

    bars = []
    start = None
    count = 0
    tz = timezone("Asia/Shanghai")

    for item in reader:

        dt = datetime.strptime(item["date"] + item["time"], '%Y/%m/%d%H%M')
        # minute - 1;
        dt = dt + timedelta(minutes=-1)
        # time > 15:00, day - 1
        if dt.hour > 15:
            dt = dt + timedelta(days=-1)
        dt = tz.localize(dt)

        bar = BarData(
            symbol=symbol,
            exchange=exchange,
            interval=interval,
            datetime=dt,

            open_price=float(item["open"]),
            high_price=float(item["high"]),
            low_price=float(item["low"]),
            close_price=float(item["close"]),
            volume=float(item["volume"]),
            turnover=float(item["turnover"]),
            open_interest=float(item["open_interest"]),

            gateway_name="DB",
        )
        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime
    print(f'{datetime.now()} Insert Bar start: {count} from {start} - {end}')

    # insert into database
    database: BaseDatabase = get_database()
    database.save_bar_data(bars)
    print(f'{datetime.now()} Insert Bar complete: {count} from {start} - {end}')

if __name__ == "__main__":

    # tdx csv表头:date,time,open,high,low,close,volume,open_interest,turnover
    load_tdx_csv_to_mongodb("D:\\new_tdx\\T0002\\export\\29#PL9.txt", Exchange.DCE, 'P99', Interval.MINUTE)
    load_tdx_csv_to_mongodb("D:\\new_tdx\\T0002\\export\\29#PL8.txt", Exchange.DCE, 'P88', Interval.MINUTE)
    load_tdx_csv_to_mongodb("D:\\new_tdx\\T0002\\export\\29#YL8.txt", Exchange.DCE, 'Y88', Interval.MINUTE)

我本地装的是2.7版本,修改了部分代码可以跑起来了,代码在下面。
可以录制data_recorder_setting.json里配置的品种,如果想录制全市场,就把注释的那一大段代码打开就可以了。

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")

def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # def load_setting(self):
    #     # 不读取原数据记录设置
    #     pass
    # def process_contract_event(self, event):
    #     """
    #     设置记录所有期货合约
    #     """
    #     contract = event.data
    #     vt_symbol = contract.vt_symbol
    #     
    #     # 不录制期权
    #     if is_futures(vt_symbol):
    #         if RecordMode.BAR in self.record_modes:
    #             self.add_bar_recording(vt_symbol)
    #         if RecordMode.TICK in self.record_modes:
    #             self.add_tick_recording(vt_symbol)
    #         self.subscribe(contract)

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)

if __name__ == "__main__":
    run_parent()

xiaohe wrote:

郑商所的数据推送,没有3点后的最后一个TICK用于标识收盘完成,所以要调用BarGenerator.generate函数来做最终的强制K线生成(不等了)

这个会在后续版本升级吗?还是需要自己修改?

我用的2.7版本,如果不重启,上期所的bar入库了,郑商所大商所的都没有数据入库。

xzhangef wrote:

我在网上查到的是,vs code默认只能调试进入自己的代码,如果需要进入第三方库,需要配置justMyCode: false
但是我加了这个配置还是不行,你解决了吗?

已解决。原来是启动调试的时候要选择这个调试配置文件。

我在网上查到的是,vs code默认只能调试进入自己的代码,如果需要进入第三方库,需要配置justMyCode: false
但是我加了这个配置还是不行,你解决了吗?

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】