VeighNa量化社区
你的开源社区量化交易平台
ranjianlin's Avatar
Member
离线
244 帖子
声望: 0

xiaohe wrote:

不要在bg里调用吧

需要全市场行情数据入库,generate()强制合成放在哪里合适一点呢?望指点一下,万分感激

xiaohe wrote:

可以收盘后自己调用bg.generate()强制合成或者修改你过滤的逻辑

咨询一下,,是在update_bar函数中调用调用bg.generate()强制合成合适呢?还是在update_tick函数中调用更合适?或者是其他地方调用更合适呢?

StopOrder(vt_symbol='pp2309.DCE', direction=<Direction.LONG: '多'>, offset=<Offset.CLOSE: '平'>, price=7429.0, volume=1.0, stop_orderid='STOP.1', strategy_name='5', datetime=datetime.datetime(2023, 8, 7, 11, 8, 2, 204781, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), lock=False, net=False, vt_orderids=[], status=<StopOrderStatus.WAITING: '等待中'>)

停止单格式如上所示,保存到json时报错如下所示,咨询一下有什么好的方法把停止单保存到字典,开盘后再发出来吗?

Traceback (most recent call last):
  File "C:\veighna_studio\lib\site-packages\vnpy_ctastrategy\engine.py", line 887, in stop_all_strategies
    self.stop_strategy(strategy_name)
  File "C:\veighna_studio\lib\site-packages\vnpy_ctastrategy\engine.py", line 738, in stop_strategy
    self.sync_strategy_data(strategy)
  File "C:\veighna_studio\lib\site-packages\vnpy_ctastrategy\engine.py", line 842, in sync_strategy_data
    save_json(self.data_filename, self.strategy_data)
  File "C:\Users\784050800_1623449582\Desktop\vnpy-3.7.0\vnpy\trader\utility.py", line 118, in save_json
    json.dump(
  File "C:\veighna_studio\lib\json\__init__.py", line 179, in dump
    for chunk in iterable:
  File "C:\veighna_studio\lib\json\encoder.py", line 431, in _iterencode
    yield from _iterencode_dict(o, _current_indent_level)
  File "C:\veighna_studio\lib\json\encoder.py", line 405, in _iterencode_dict
    yield from chunks
  File "C:\veighna_studio\lib\json\encoder.py", line 405, in _iterencode_dict
    yield from chunks
  File "C:\veighna_studio\lib\json\encoder.py", line 325, in _iterencode_list
    yield from chunks
  File "C:\veighna_studio\lib\json\encoder.py", line 438, in _iterencode
    o = _default(o)
  File "C:\veighna_studio\lib\json\encoder.py", line 179, in default
   raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type StopOrder is not JSON serializable
        if not (bar.datetime.minute + 1) % self.window:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

1.官方BarGenerator合成30分钟分钟k线代码如上所示,,,观察发现郑州商品交易所夜盘不推送最后59分钟k线,而是次日9点钟开盘前推送进来,但是次日9点钟开盘前的k线数据自己已经全部过滤掉了,那么BarGenerator合成郑州交易所夜盘最后30分钟的k线是否就不存在了呢 ?

2. 是否一旦缺失任意一个第59分钟的k线,自定义的30分钟k线就无法合成了吗? 这种情况如何处理好?

咨询一下,1个策略10个品种,是在CTA策略里不同的实例化效率高?还是直接把py策略复制10份,然后直接加载策略效率高呢? 推荐那种方式?

xiaohe wrote:

策略交易状态self.trading为True的时候,策略调用send_order函数才会发往CTA策略引擎。策略交易状态self.trading为False的时候,策略调用send_order函数只会返回一个空列表。CTA策略引擎的start_strategy函数是先给策略推送on_start,再把策略交易状态变为True的

https://www.vnpy.com/forum/topic/2934-qiu-zhu-shou-pan-shi-ke-xia-dan-he-on-trade-on-stop-order-de-wen-ti

description

上面面是群主关于“求助,收盘时刻下单和on_trade & on_stop_order 的问题”帖子的回复,,说可以写在on_start里,也可以写在on_tick里。是否可以认为,群主说的on_start里提交缓存的停止单存在逻辑错误,是一个逻辑bug呢?另外如果写在on_tick里面会存在反复发出的情况,,有什么高效率的写法在on_tick里面只发出一次缓存的停止单吗?望指点一下,谢谢

非交易时间的数据数据过滤,是在引擎里面的process_tick_even过滤好呢? 还是在Bargenerator里面过滤效率更高?更建议在哪里过滤?

如下图所示,策略on_start里面发出的停止单没在CTA策略本地停止单界面显示,在on_start里面也写了self.put_event()刷新的,怎么可以实现在CTA策略本地停止单界面显示呢?print显示,实际订单已发出,只是界面未显示。实测只有在on_tick , on_bar里面发出的订单才会显示在CTA策略本地停止单界面。

description

description

如图所示,对比vnpy3.7与vnpy2.0.8版本utility中的BarGenerator-update_tick函数,根本没有这句过滤,应该与CTP版本关系不大。

vnpy3.7遇到同样问题,也不知道该如何解决。

xiaohe wrote:

自己在策略引擎里打印排查看看就知道了

反复测试,策略停止后编辑6个策略 fix_size手数,然后重启,均是修改后点击启动立即生效。为什么修改后的自定义x分钟数不是立即生效呢?搞不懂。

郑州CZCE交易所使用CTP接口接收的推送数据,print发现没有14:59与22:59数据推送的,上海SHFE,大连DCE交易所均能收到14:59与22:59数据推送,大家有遇到类似的情况吗?

xiaohe wrote:

可以打印排查一下是否是收到非交易时段tick导致的

经反复测试排查发现,当日可以正常录制全市场行情数据,但是次日发现,后面几天仅部分合约有录制进数据库,是因为这个全市场录制行情数据代码没有收到DCE与CZCE交易所的数据推送。

xiaohe wrote:

edit_strategy的时候update_setting了呀

我是自定义变量合成x分钟k线,例如之前设置x=2分钟,修改为x=4分钟,编辑后重启没生效,不知道这个是否有影响?

xiaohe wrote:

无需重启

反复测试发现,例如之前x是1,暂停后修改x为2,虽然界面显示x已更改为2,但是计算的数据依旧是1,并未更改过来,查阅ctastrategy源码发现加载json文件也只是在初始化时候才会加载,暂停后重启并未重新加载json文件。

juchao1023 wrote:

xzhangef wrote:

我本地装的是2.7版本,修改了部分代码可以跑起来了,代码在下面。
可以录制data_recorder_setting.json里配置的品种,如果想录制全市场,就把注释的那一大段代码打开就可以了。

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")

def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # def load_setting(self):
    #     # 不读取原数据记录设置
    #     pass
    # def process_contract_event(self, event):
    #     """
    #     设置记录所有期货合约
    #     """
    #     contract = event.data
    #     vt_symbol = contract.vt_symbol
    #     
    #     # 不录制期权
    #     if is_futures(vt_symbol):
    #         if RecordMode.BAR in self.record_modes:
    #             self.add_bar_recording(vt_symbol)
    #         if RecordMode.TICK in self.record_modes:
    #             self.add_tick_recording(vt_symbol)
    #         self.subscribe(contract)

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)

if __name__ == "__main__":
    run_parent()

感谢大佬,这个可以用,2023.6.5

经反复测试,当日可以正常录制全市场行情数据,但是次日发现,后面几天仅部分合约有录制进数据库,原因具体是?

ranjianlin wrote:

xiaohe wrote:

vnpy_dolphindb版本用pip list就能看到

官方是否有考虑修复vnpy-dolphindb1.0.8中数据录制问题?每次使用vnpy-dolphindb录制1分钟k线数据,然后打开“数据管理”刷新一下就会报错。

xiaohe wrote:

vnpy_dolphindb版本用pip list就能看到

vnpy-dolphindb 1.0.8

xiaohe wrote:

你录制的合约名称是?
vnpy_dolphondb版本是?

合约是任意合约录制均报错

版本是DolphinDB_Win64_V2.00.9.5或者DolphinDB_Win64_V2.00.9.7

vnpy3.7版本是否可行?

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】