VeighNa量化社区
你的开源社区量化交易平台
dingfei's Avatar
Member
离线
16 帖子
声望: 0

然后应该利用写时复制,多个进程共用内存中的数据,大大减少多进程的内存开销

我的电脑64核心, 使用cta回测的参数优化功能的 时候,会开64个进程,导致内存耗尽,然后进程被杀掉。

不应该简单的根据核心数量来决定开少个进程,应该综合考量可用内存数来决定

全市场录制会卡死把。我实测全录制的,只能收到很少的数据,服务器不发送了。只有录制一小部分才能收到

郭易燔 wrote:

你的8分钟是指+08:00么,这个是时区。
毫秒没有显示是数据库的问题吧,不知道是数据库没有显示还是自动保存成到了秒。

不是时区。是8分钟,时间2022-03-04 10:54:35.5 ,存到mysql就变成了 2022-03-04 10:46:39

通过websocket接收到vnpy推过来的数据的时间是**_

2022-03-04 10:54:35.500000+08:00

_** :

quote here{"gateway_name":"CTP","symbol":"IO2205-P-4500","exchange":"CFFEX","datetime":"2022-03-04 10:54:35.500000+08:00","name":"IO2205-P-4500","volume":122,"turnover":1665580,"open_interest":129,"last_price":122,"last_volume":0,"limit_up":573.2,"limit_down":0.2,"open_price":141.8,"high_price":143,"low_price":121.8,"pre_close":114.2,"bid_price_1":123.4,"bid_price_2":0,"bid_price_3":0,"bid_price_4":0,"bid_price_5":0,"ask_price_1":125.8,"ask_price_2":0,"ask_price_3":0,"ask_price_4":0,"ask_price_5":0,"bid_volume_1":2,"bid_volume_2":0,"bid_volume_3":0,"bid_volume_4":0,"bid_volume_5":0,"ask_volume_1":2,"ask_volume_2":0,"ask_volume_3":0,"ask_volume_4":0,"ask_volume_5":0,"localtime":null,"vt_symbol":"IO2205-P-4500.CFFEX"}

但是:数据库中的时间 **_

2022-03-04 10:46:39

_**,少了8分钟左右。而且那0.5秒没保存到

description

全新安装的时候,才会报错。用过几次后又不会了

windows下打开2.9版本开启web后,访问http://127.0.0.1:8000/docs

会报错:提示文件 \vnstudio\lib**** 不存在。
实际上硬盘中没有 \vnstudio\lib , 只有 \vnstudio\Lib

libiya2000 wrote:

青青子荆 wrote:

libiya2000 wrote:

shunyuzhiqian wrote:

description

hi 我也遇到相同的问题了,请问你是如何处理这块,而解决的?
请参考一下13楼

也是不好用,我在windows下完全可以用,可是换到Ubuntu20.04 就不好用了。
在def record_tick(self, tick: TickData):里面加打印函数也没反应。
后来通过main_engine.add_engine(WholeMarketRecorder)的方式让def record_tick(self, tick: TickData):里面有了打印。
但是实际上仍然没有在sql里写入数据。
而且全程连个报错也没有出现。

大佬,你的 datarecoder.py 能不能分享一下?

kingmo888 wrote:

RT,
订阅全市场所有合约的Tick,无论用什么数据库存储,都会因为存储造成事件拥挤,结果就是当前时刻处理的数据可能还是几分钟甚至几十分钟前的行情。

请问,目前用vnpy的话,有没有可能做到期货全市场合约无延迟(低延迟)录制?

谢谢。

大佬,能不能分享一下你的行情记录的代码,我找了很久没找到能用的

wrote:

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()

大佬,这个太老了,最新2.9版本用不了,不知道怎么改,能不能贴一下最新的代码???

xiaohe wrote:

不好意思,贴错了,是这个https://www.vnpy.com/forum/topic/3046-quan-shi-chang-lu-zhi-xing-qing-shu-ju
谢谢大佬,这个太老了,报错,我不知道怎么改。能不能贴一下你用的最新版本可用的代码?

description

xiaohe wrote:

可参考https://www.vnpy.com/docs/cn/cta_strategy.html#id19
这个是操作UI界面的教程。我问的是 用代码记录实时行情的教程

随便下载什么合约的代码都可以,求大佬分享一下

图像界面需要一个个添加代码,非常不方便。
哪位大佬能分享一下代码启动行情录制的代码,求代码

鹤唳幽云 wrote:

yyguoliang wrote:

国内期货实盘已经开通,采用软件默认方式,通过DataManager 和 DataRecorder 模块进行行情录制,(版本vnpy-master 2.18)
问题是经常在开盘后发现软件没有正常录制行情(通过刷新发现没有变化),导致一直没有足够的bardata 进行策略计算,
一般发现后再重启就正常了,请问这个问题具体在哪里?如何解决?
再有就是官方推荐什么保险的录制行情的方法?最好能24小时无人值守。
谢谢。

请问楼主是否使用了RQdata?还是使用ctp的行情服务器录制数据?请问如何实现录制实时合约形成主力连续数据?

请问你搞定了吗?使用ctp的行情服务器录制数据,能不能分享一下代码

Traceback (most recent call last):
File "e:\vnstudio\lib\site-packages\peewee.py", line 3144, in execute_sql
cursor.execute(sql, params or ())
sqlite3.OperationalError: table dbtickdata has no column named turnover

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "e:\vnstudio\lib\site-packages\vnpy_datarecorder\ui\widget.py", line 157, in process_exception_event
raise exc_info[1].with_traceback(exc_info[2])
File "e:\vnstudio\lib\site-packages\vnpy_datarecorder\engine.py", line 84, in run
self.database.save_tick_data(data)
File "e:\vnstudio\lib\site-packages\vnpy_sqlite\sqlite_database.py", line 208, in save_tick_data
DbTickData.insert_many(c).on_conflict_replace().execute()
File "e:\vnstudio\lib\site-packages\peewee.py", line 1907, in inner
return method(self, database, args, **kwargs)
File "e:\vnstudio\lib\site-packages\peewee.py", line 1978, in execute
return self._execute(database)
File "e:\vnstudio\lib\site-packages\peewee.py", line 2745, in _execute
return super(Insert, self)._execute(database)
File "e:\vnstudio\lib\site-packages\peewee.py", line 2474, in _execute
cursor = database.execute(self)
File "e:\vnstudio\lib\site-packages\peewee.py", line 3157, in execute
return self.execute_sql(sql, params, commit=commit)
File "e:\vnstudio\lib\site-packages\peewee.py", line 3151, in execute_sql
self.commit()
File "e:\vnstudio\lib\site-packages\peewee.py", line 2917, in exit
reraise(new_type, new_type(exc_value,
exc_args), traceback)
File "e:\vnstudio\lib\site-packages\peewee.py", line 190, in reraise
raise value.with_traceback(tb)
File "e:\vnstudio\lib\site-packages\peewee.py", line 3144, in execute_sql
cursor.execute(sql, params or ())
peewee.OperationalError: table dbtickdata has no column named turnover

description

mysql能正常连接, 手动创建了新的空的数据库

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】