VeighNa量化社区
你的开源社区量化交易平台

置顶主题

2023年第7次社区活动 - 基于Scikit-Learn的机器学习CTA策略信号挖掘实践 - 12月2日(成都)

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2023-11-10
 
2023年第7场VeighNa社区活动开始报名,本场活动将在成都举办,分享主题是【基于Scikit-Learn的机器学习CTA策略信号挖掘实践】

之前北京场的活动现场座无虚席,大家的热情程度远超我们预期,附上一张北京场的活动照片:

description

机器学习(Machine Learning)各种算法在量化交易领域中的应用越发广泛,但由于目前互联网上的资料质量参差不齐,许多VeighNa社区的同学想要学习尝试但却不知道从何入手。

本次活动中,我们将会由浅入深介绍机器学习技术在CTA量化策略开发中的应用场景,并基于Scikit-Learn这款广受好评的机器学习算法库,给出一套具体的CTA策略信号挖掘实践案例:

 
KBins聚类特征分析

description

 
特征相关性热力图

description

 
向量化回测绩效图

description

本场活动仅提供线下参会名额(预估40人位置),还是优先对金融机构量化从业人员开放,请在填写报名表单时提供公司和职位信息,后续报名成功的同学小助手会添加微信联系确认。

 
内容:

  • 机器学习在量化中的应用场景

    • 当今CTA策略开发的核心难点
    • 常见机器学习算法介绍
    • 自动化因子挖掘和智能化信号生成
  • 基于Scikit-Learn的实践案例

    • 向量化时序特征计算准备
    • 应用KBeans聚类学习算法
    • 对于无监督学习结果的有监督分析
    • 构建完整策略进行事件驱动回测
  • 其他近期社区感兴趣的主题

    • 迅投研和vnpy_xt数据服务
    • 如何选择适合机器学习工作的硬件
    • Python 3.12发布内容解析
  • QA问答和交流环节

 
时间:12月2日 14:00-17:00

地点:成都(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(请在付款前填写报名信息表)

description

 



全市场录制行情数据

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()


解密并强化日内经典策略R-Breaker

R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。

 
 

R-Breaker策略逻辑

 

R-Breaker的策略逻辑由以下4部分构成:

1)计算6个目标价位

根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:

  • 突破买入价(Bbreak)
  • 观察卖出价(Ssetup)
  • 反转卖出价(Senter)
  • 反转买入价(Benter)
  • 观察买入价(Bsetup)
  • 突破卖出价(Sbreak)

 

他们的计算方法如下:(其中a、b、c、d为策略参数)

  • 观察卖出价(Ssetup)= High + a * (Close – Low)
  • 观察买入(Bsetup)= Low – a * (High – Close)
  • 反转卖出价(Senter)= b / 2 * (High + Low) – c * Low
  • 反转买入价(Benter)= b / 2 * (High + Low) – c * High
  • 突破卖出价(Sbreak)= Ssetup - d * (Ssetup – Bsetup)
  • 突破买入价(Bbreak)= Bsetup + d * (Ssetup – Bsetup)

 

description

2)设计委托逻辑

趋势策略情况:

  • 若价格>突破买入价,开仓做多;
  • 若价格<突破卖出价,开仓做空;

 

反转策略情况:

  • 若日最高价>观察卖出价,然后下跌导致价格<反转卖出价,开仓做空或者反手(先平仓再反向开仓)做空;
  • 若日最低价<观察买入价,然后上涨导致价格>反转买入价,开仓做多或者反手(先平仓再反向开仓)做多;

 

3)设定相应的止盈止损。

 

4)日内策略要求收盘前平仓。

 

上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。

 
 

策略逻辑优化

 

实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:

1)趋势策略:

  • 若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;
  • 若当前x分钟的最低价<观察买入价,认为它具有下跌趋势,在突破卖出价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 增加过滤条件:为防止横盘行情导致不断的开平仓,日内每次开仓买入开仓(卖出开仓)委托的价位都比上一次更高(更低);
  • 收盘前,必须平调所持有的仓位。

 

2)反转策略:

  • 若当前x分钟的最高价>观察卖出价,认为它已经到了当日阻力位,可能发生行情反转,在反转卖出价挂上卖出开仓的停止单;
  • 若当前x分钟的最低价>观察买入价,认为它已经到了当日支撑位,可能发生行情反转,在反转买入价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 收盘前,必须平调所持有的仓位。

 

其代码实现逻辑如下:

self.tend_high, self.tend_low = am.donchian(self.donchian_window)

if bar.datetime.time() < self.exit_time:

    if self.pos == 0:
        self.intra_trade_low = bar.low_price
        self.intra_trade_high = bar.high_price

        # Trend Condition
        if self.tend_high > self.sell_setup:
            long_entry = max(self.buy_break, self.day_high)
            self.buy(long_entry, self.fixed_size, stop=True)

            self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

        elif self.tend_low < self.buy_setup:
            short_entry = min(self.sell_break, self.day_low)
            self.short(short_entry, self.fixed_size, stop=True)

            self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

# Close existing position
else:
    if self.pos > 0:
        self.sell(bar.close_price * 0.99, abs(self.pos))
    elif self.pos < 0:
        self.cover(bar.close_price * 1.01, abs(self.pos))

 
 

策略效果

 

同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。

1)趋势策略:

  • 趋势策略夏普比率1.96,日均成交2.6笔,资金曲线是整体上扬的;
  • 但是在2017~2018年的盘整阶段,具有较大并且持续时间较长的回撤;
  • 这凸显出趋势类策略自身无法规避的缺点:在趋势行情中盈利,意味着震荡行情必然亏损。

description

 

2)反转策略

  • 反转策略夏普比率0.75,日均成交0.4笔,资金曲线缓慢上扬;
  • 但是在2017~2018年的盘整阶段,资金曲线上扬最快,而且这个阶段是最平滑的;
  • 这凸显出反转类策略优点:尽管在趋势行情亏损,在震荡行情必然能盈利。

description

 

综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:

  • 由于趋势策略日均交易笔数较多(2.6笔),它主要负责贡献R-Breaker策略的alpha;
  • 趋势策略的亏损也是主要导致R-Breaker策略亏损的原因,但这时候的亏损由反转策略的盈利来填补。

由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。

由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。

为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。

description

 
 

结论

 

R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。

这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。

更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。

从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。

当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。

向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:

  • 私募基金面向高净值客户,这些客户群体风险承受能力较高;并且私募监管比较放松,能使用的衍生品较多,有提升业绩的自由度。故私募基金除了管理费,更追求业绩提成。
  • 公募基金面向普通群众,他们风险承受能力较低;并且公募监管非常严格,投资约束非常多,提升业绩难度较大。但是公募牌照的稀缺性必然导致该行业是盈利的。如万亿级别的管理规模,其管理费的收益,也不是一般的自营交易公司或者私募基金比得上的。

 
 

附录

 

最后附上策略源代码:

from datetime import time
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)
​
​
class RBreakStrategy(CtaTemplate):
    """"""
    author = "KeKe"
​
    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30
​
    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3
​
    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价
​
    intra_trade_high = 0
    intra_trade_low = 0
​
    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0
​
    exit_time = time(hour=14, minute=55)
​
    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
​
    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(RBreakStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )
​
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []
​
    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)
​
    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")
​
    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")
​
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)
​
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()
​
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return
​
        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]
​
        # New Day
        if last_bar.datetime.date() != bar.datetime.date():
            if self.day_open:
​
                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价
​
                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价
​
                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价
​
            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price
​
        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price
​
        if not self.sell_setup:
            return
​
        self.tend_high, self.tend_low = am.donchian(self.donchian_window)
​
        if bar.datetime.time() < self.exit_time:
​
            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price
​
                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)
​
                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
​
                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)
​
                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
​
            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)
​
            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)
​
        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))
​
       self.put_event()
​
    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass
​
    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()
​
    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass


2023年第6次社区活动 - 基于Scikit-Learn的机器学习CTA策略信号挖掘实践 - 11月25日(上海)

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2023-10-30
 
2023年第6场VeighNa社区活动开始报名,本场活动将在上海举办,分享主题是【基于Scikit-Learn的机器学习CTA策略信号挖掘实践】

上周北京场的活动现场座无虚席,大家的热情程度远超我们预期,所以上海场就早点开始报名了!附上一张北京场的活动照片:

description

机器学习(Machine Learning)各种算法在量化交易领域中的应用越发广泛,但由于目前互联网上的资料质量参差不齐,许多VeighNa社区的同学想要学习尝试但却不知道从何入手。

本次活动中,我们将会由浅入深介绍机器学习技术在CTA量化策略开发中的应用场景,并基于Scikit-Learn这款广受好评的机器学习算法库,给出一套具体的CTA策略信号挖掘实践案例:

 
KBins聚类特征分析

description

 
特征相关性热力图

description

 
向量化回测绩效图

description

本场活动仅提供线下参会名额(预估40人位置),还是优先对金融机构量化从业人员开放,请在填写报名表单时提供公司和职位信息,后续报名成功的同学小助手会添加微信联系确认。

 
内容:

  • 机器学习在量化中的应用场景

    • 当今CTA策略开发的核心难点
    • 常见机器学习算法介绍
    • 自动化因子挖掘和智能化信号生成
  • 基于Scikit-Learn的实践案例

    • 向量化时序特征计算准备
    • 应用KBeans聚类学习算法
    • 对于无监督学习结果的有监督分析
    • 构建完整策略进行事件驱动回测
  • 其他近期社区感兴趣的主题

    • 迅投研和vnpy_xt数据服务
    • 如何选择适合机器学习工作的硬件
    • Python 3.12发布内容解析
  • QA问答和交流环节

 
时间:11月25日 14:00-17:00

地点:上海(具体地址微信确认后通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码填写表单报名(报名成功小助手会添加微信联系确认)

description
 



获得属于自己的保证金率和手续费(率)

1. 合约信息中包含保证金率

1.1 合约信息查询命令:

ReqQryInstrument : 请求查询合约,填空可以查询到所有合约。
响应:OnRspQryInstrument
◇ 1.函数原型
virtual int ReqQryInstrument(CThostFtdcQryInstrumentField *pQryInstrument, int nRequestID) = 0;
◇ 2.参数
pQryInstrument:查询合约
struct CThostFtdcQryInstrumentField
{
    TThostFtdcInstrumentIDType InstrumentID; ///合约代码
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcExchangeInstIDType ExchangeInstID; ///合约在交易所的代码
    TThostFtdcInstrumentIDType ProductID;///产品代码
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

1.2 合约信息查询结果:

请求查询合约响应,当执行ReqQryInstrument后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrument(CThostFtdcInstrumentField *pInstrument, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrument:
合约
struct CThostFtdcInstrumentField
{
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcInstrumentNameType InstrumentName; ///合约名称
    TThostFtdcExchangeInstIDType ExchangeInstID;///合约在交易所的代码
    TThostFtdcInstrumentIDType ProductID; ///产品代码
    TThostFtdcProductClassType ProductClass; ///产品类型
    TThostFtdcYearType DeliveryYear; ///交割年份
    TThostFtdcMonthType DeliveryMonth;///交割月
    TThostFtdcVolumeType MaxMarketOrderVolume; ///市价单最大下单量
    TThostFtdcVolumeType MinMarketOrderVolume;///市价单最小下单量
    TThostFtdcVolumeType MaxLimitOrderVolume; ///限价单最大下单量
    TThostFtdcVolumeType MinLimitOrderVolume; ///限价单最小下单量
    TThostFtdcVolumeMultipleType VolumeMultiple; ///合约数量乘数
    TThostFtdcPriceType PriceTick; ///最小变动价位
    TThostFtdcDateType CreateDate; ///创建日
    TThostFtdcDateType OpenDate; ///上市日
    TThostFtdcDateType ExpireDate;///到期日
    TThostFtdcDateType StartDelivDate; ///开始交割日
    TThostFtdcDateType EndDelivDate; ///结束交割日
    TThostFtdcInstLifePhaseType InstLifePhase; ///合约生命周期状态
    TThostFtdcBoolType IsTrading;///当前是否交易
    TThostFtdcPositionTypeType PositionType; ///持仓类型
    TThostFtdcPositionDateTypeType PositionDateType;///持仓日期类型
    TThostFtdcRatioType LongMarginRatio;///多头保证金率
    TThostFtdcRatioType ShortMarginRatio; ///空头保证金率
    TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;///是否使用大额单边保证金算法
    TThostFtdcInstrumentIDType UnderlyingInstrID;///基础商品代码
    TThostFtdcPriceType StrikePrice;///执行价
    TThostFtdcOptionsTypeType OptionsType;///期权类型
    TThostFtdcUnderlyingMultipleType UnderlyingMultiple; ///合约基础商品乘数
    TThostFtdcCombinationTypeType CombinationType;///组合类型
};
VolumeMultiple:合约乘数(同交易所)
PriceTick:最小变动价位(同交易所)
IsTrading:是否活跃(同交易所)
DeliveryYear:交割年份(同交易所)
DeliveryMonth:交割月(同交易所)
OpenDate:上市日(同交易所)
CreateDate:创建日(同交易所)
ExpireDate:到期日(同交易所)
StartDeliveDate:开始交割日(同交易所)
EndDelivDate:结束交割日(同交易所)

同交易所表示这些字段每天更新自交易所,其余字段为柜台设置值。如果发现有些字段值有误,则以此来判断是交易所问题还是CTP柜台设置问题。
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID; ///错误代码
    TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

2. 保证金率查询结果中包含保证金

2.1 保证金率查询命令

ReqQryInstrumentMarginRate
请求查询合约保证金率,对应响应OnRspQryInstrumentMarginRate。如果InstrumentID填空,则返回持仓对应的合约保证金率,否则返回相应InstrumentID的保证金率。
目前无法通过一次查询得到所有合约保证金率,如果要查询所有,则需要通过多次查询得到。

◇ 1.函数原型
virtual int ReqQryInstrumentMarginRate(CThostFtdcQryInstrumentMarginRateField *pQryInstrumentMarginRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentMarginRate:
查询合约保证金率
struct CThostFtdcQryInstrumentMarginRateField
{
    ///经纪公司代码
    TThostFtdcBrokerIDType BrokerID;
    ///投资者代码
    TThostFtdcInvestorIDType InvestorID;
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///投机套保标志
    TThostFtdcHedgeFlagType HedgeFlag;
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///投资单元代码
    TThostFtdcInvestUnitIDType InvestUnitID;
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

2.2 保证金率查询结果

OnRspQryInstrumentMarginRate
请求查询合约保证金率响应,当执行ReqQryInstrumentMarginRate后,该方法被调用。

◇ 1.函数原型
virtual void OnRspQryInstrumentMarginRate(CThostFtdcInstrumentMarginRateField *pInstrumentMarginRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};

◇ 2.参数    ///:
合约保证金率
struct CThostFtdcInstrumentMarginRateField
{
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcInvestorRangeType InvestorRange;///投资者范围
    TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
    TThostFtdcInvestorIDType InvestorID;///投资者代码
    TThostFtdcHedgeFlagType HedgeFlag; ///投机套保标志
    TThostFtdcRatioType LongMarginRatioByMoney;///多头保证金率
    TThostFtdcMoneyType LongMarginRatioByVolume;///多头保证金费
    TThostFtdcRatioType ShortMarginRatioByMoney; ///空头保证金率
    TThostFtdcMoneyType ShortMarginRatioByVolume; ///空头保证金费
    TThostFtdcBoolType IsRelative;///是否相对交易所收取
    TThostFtdcExchangeIDType ExchangeID;///交易所代码
    TThostFtdcInvestUnitIDType InvestUnitID; ///投资单元代码
};
pRspInfo:响应信息

struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID;///错误代码
    TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。

bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

3. 手续费(率)查询结果中包含手续费

3.1 手续费(率)查询命令

ReqQryInstrumentCommissionRate
请求查询合约手续费率,对应响应OnRspQryInstrumentCommissionRate。如果InstrumentID填空,则返回持仓对应的合约手续费率。
目前无法通过一次查询得到所有合约手续费率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentCommissionRate(CThostFtdcQryInstrumentCommissionRateField *pQryInstrumentCommissionRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentCommissionRate:
查询手续费率
struct CThostFtdcQryInstrumentCommissionRateField
{
    TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
    TThostFtdcInvestorIDType InvestorID;///投资者代码
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcExchangeIDType ExchangeID;///交易所代码
    TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};

InstrumentID:返回手续费率对应的合约。
但是如果在柜台没有设置具体合约的手续费率,则默认会返回产品的手续费率,InstrumentID就为对应产品ID。
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

3.3 手续费(率)查询结果

OnRspQryInstrumentCommissionRate
请求查询合约手续费率响应,当执行ReqQryInstrumentCommissionRate后,该方法被调用。

◇ 1.函数原型
virtual void OnRspQryInstrumentCommissionRate(CThostFtdcInstrumentCommissionRateField *pInstrumentCommissionRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrumentCommissionRate:合约手续费率
struct CThostFtdcInstrumentCommissionRateField
{
    TThostFtdcInstrumentIDType InstrumentID; ///合约代码
    TThostFtdcInvestorRangeType InvestorRange; ///投资者范围
    TThostFtdcBrokerIDType BrokerID;///经纪公司代码
    TThostFtdcInvestorIDType InvestorID; ///投资者代码
    TThostFtdcRatioType OpenRatioByMoney; ///开仓手续费率
    TThostFtdcRatioType OpenRatioByVolume; ///开仓手续费
    TThostFtdcRatioType CloseRatioByMoney;///平仓手续费率
    TThostFtdcRatioType CloseRatioByVolume;///平仓手续费
    TThostFtdcRatioType CloseTodayRatioByMoney;///平今手续费率
    TThostFtdcRatioType CloseTodayRatioByVolume;///平今手续费
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcBizTypeType BizType;///业务类型    
    TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};

pRspInfo:
响应信息
struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID; ///错误代码
    TThostFtdcErrorMsgType ErrorMsg; ///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。

bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

4. 合约+保证金率+手续费(率)= 完整的合约参数

令:
合约查询结果 = C
保证金率查询结果 = M
手续费查询结果 = S
则:

合约乘数:

C["VolumeMultiple"]

保证金率:

if M["Is_Relative"] == 1:
    多头保证金率 = C["LongMarginRatio"] + M["LongMarginRatioByMoney"] 
    空头保证金率 = C["ShortMarginRatio"] + M["ShortMarginRatioByMoney"] 

else:
多头保证金率 = M["LongMarginRatioByMoney"]
空头保证金率 = M["ShortMarginRatioByMoney"]

手续费(率):

        if S.open_ratio_bymoney == 0.0:
            开仓手续费= [FeeType.LOT,S["OpenRatioByVolume"] ]
            平仓手续费= [FeeType.LOT,S["CloseRatioByVolume"] ]
            平今手续费= [FeeType.LOT,S["CloseTodayRatioByVolume"] ]
        else:
            开仓手续费 = [FeeType.RATE,S["OpenRatioByMoney"] ]
            平仓手续费 = [FeeType.RATE,S["CloseRatioByMoney"] ]
            平今手续费 = [FeeType.RATE,S["CloseTodayRatioByMoney"] ]      


使用Jupyter NoteBook进行IB查询和交易,以及使用算法交易示例

在搞好IB盈透接口后,试了下客户端交易,但是最终目的还是使用程序化交易。发现vnpy已经提供的Script_engine来支持Jupyter NoteBook 交易的,而且非常方便调用。
这里就用写了用代码实现IB盈透下的查询和交易,和一个TWVP算法交易。

Script_engine的大多操作都是针对main_engine的封装,类似的逻辑,其他交易相关App,也可以用类似方法调用,真的很方便,比起之前调试来说。其实算法交易调用也很直接,直接传入algo setting 的dict就可以。

应为Jupyter NoteBook代码不好贴,我这里又改写会直接python code。在启动tws登录后,可以直接运行。
另外IB接口的返回信息采用一个中wrapper机制,有点类似Spring的反转调用,可以理解为本地返回方法是被IBapi调用的写入。

from vnpy.app.script_trader import init_cli_trading
from vnpy.gateway.ib import IbGateway
from time import sleep
# 连接到服务器
setting = {
    "TWS地址": "127.0.0.1",
    "TWS端口": 7497,
    "客户号":5 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接

# 查询资金 - 自动
sleep(10)
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))

# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("152791428",Exchange.SEHK) #创建行情订阅,腾讯
req2 = SubscribeRequest("332623976",Exchange.SEHK) #创建行情订阅,美团
req3 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅,美团
engine.main_engine.subscribe(req1,"IB")
engine.main_engine.subscribe(req2,"IB")
engine.main_engine.subscribe(req3,"IB")

# 返回行情
sleep(10)
print(engine.get_all_contracts(use_df = True)) #返回所有已经订阅的contact
print(engine.get_contract("152791428.SEHK",use_df = True)) #返回单个订阅的contact
print(engine.get_ticks(["152791428.SEHK","332623976.SEHK"],use_df = True)) #返回订阅的tick

# 委托下单,返回订单号
from vnpy.trader.constant import OrderType
vt_orderid = engine.buy(vt_symbol = "12087792.IDEALPRO",price = 1.20, volume = 50000, order_type = OrderType.LIMIT)
print(vt_orderid)


# 按照订单号查询委托状态,这里也可以用get_orders, 查询订单号队列
sleep(10)
print(engine.get_order(vt_orderid)) #
print(engine.get_trades(vt_orderid, use_df= True))
# 再次查询持仓
print(engine.get_all_positions(use_df = True))

# 使用算法交易引擎
from vnpy.app.algo_trading import AlgoTradingApp
engine.main_engine.add_app(AlgoTradingApp) #加入app
AlgoInstance = engine.main_engine.get_engine("AlgoTrading") #为了方便,这里直接用返回的AlgoInstance
# 创建算法交易的要执行交易内容, 这个可以复制 algo_trading_setting.json的内容,这里这里策略是,100秒内每隔10秒下单一次,每次购买10000
AlgotradingDict1 = {
        "template_name": "TwapAlgo",
        "vt_symbol": "12087792.IDEALPRO",
        "direction": "多",
        "price": 1.0985,
        "volume": 10000.0,
        "time": 100,
        "interval": 10,
        "offset": ""
    }
AlgoInstance.start_algo(setting = AlgotradingDict1)

# 再次查询持仓
print(engine.get_all_positions(use_df = True))


查询仓位,持仓均价,未成交委托单一个函数搞定

1.首先完善converter.py

class PositionHolding:
    """"""

    def __init__(self, contract: ContractData = None):
        """"""
        if contract:
            self.vt_symbol = contract.vt_symbol
            self.exchange = contract.exchange

        self.active_orders = {}
        self.order_id = ""
        self.long_pos = 0
        self.long_pnl = 0
        self.long_price = 0
        self.long_yd = 0
        self.long_td = 0
        self.short_pos = 0
        self.short_pnl = 0
        self.short_price = 0        
        self.short_yd = 0
        self.short_td = 0
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

    def update_position(self, position: PositionData):
        """"""
        if position.direction == Direction.LONG:
            self.long_pos = position.volume
            self.long_pnl = position.pnl
            self.long_yd = position.yd_volume
            self.long_td = self.long_pos - self.long_yd
            self.long_price = position.price
            self.long_pos_frozen = position.frozen
        else:
            self.short_pos = position.volume
            self.short_pnl = position.pnl            
            self.short_yd = position.yd_volume
            self.short_td = self.short_pos - self.short_yd
            self.short_price = position.price
            self.short_pos_frozen = position.frozen
    def update_order(self, order: OrderData):
        """"""
        #active_orders只记录未成交和部分成交委托单
        if order.status in [Status.NOTTRADED, Status.PARTTRADED]:
            self.active_orders[order.vt_orderid] = order
        else:
            if order.vt_orderid in self.active_orders:
                self.active_orders.pop(order.vt_orderid)

        self.calculate_frozen()

    def update_order_request(self, req: OrderRequest, vt_orderid: str):
        """"""
        #分离gateway_name和orderid
        gateway_name,*split_orderid = vt_orderid.split("_")
        if len(split_orderid) == 1:
            self.order_id = split_orderid[0]
        elif len(split_orderid) == 2:
            self.order_id = "_".join([split_orderid[0],split_orderid[1]])
        elif len(split_orderid) == 3:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2]])
        elif len(split_orderid) == 4:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2],split_orderid[3]])
        if self.order_id:
            order = req.create_order_data(self.order_id, gateway_name)
            self.update_order(order)

    def update_trade(self, trade: TradeData):
        """"""
        if trade.direction == Direction.LONG:
            if trade.offset == Offset.OPEN:
                self.long_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.short_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.short_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.short_yd -= trade.volume
                else:
                    self.short_td -= trade.volume

                    if self.short_td < 0:
                        self.short_yd += self.short_td
                        self.short_td = 0
        else:
            if trade.offset == Offset.OPEN:
                self.short_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.long_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.long_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.long_yd -= trade.volume
                else:
                    self.long_td -= trade.volume

                    if self.long_td < 0:
                        self.long_yd += self.long_td
                        self.long_td = 0

        self.long_pos = self.long_td + self.long_yd
        self.short_pos = self.short_td + self.short_yd

    def calculate_frozen(self):
        """"""
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

        for order in self.active_orders.values():
            # Ignore position open orders
            if order.offset == Offset.OPEN:
                continue

            frozen = order.volume - order.traded

            if order.direction == Direction.LONG:
                if order.offset == Offset.CLOSETODAY:
                    self.short_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.short_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.short_td_frozen += frozen

                    if self.short_td_frozen > self.short_td:
                        self.short_yd_frozen += (
                            self.short_td_frozen - self.short_td)
                        self.short_td_frozen = self.short_td
            elif order.direction == Direction.SHORT:
                if order.offset == Offset.CLOSETODAY:
                    self.long_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.long_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.long_td_frozen += frozen

                    if self.long_td_frozen > self.long_td:
                        self.long_yd_frozen += (
                            self.long_td_frozen - self.long_td)
                        self.long_td_frozen = self.long_td

            self.long_pos_frozen = self.long_td_frozen + self.long_yd_frozen
            self.short_pos_frozen = self.short_td_frozen + self.short_yd_frozen

    def convert_order_request_shfe(self, req: OrderRequest):
        """"""
        if req.offset == Offset.OPEN:
            return [req]

        if req.direction == Direction.LONG:
            pos_available = self.short_pos - self.short_pos_frozen
            td_available = self.short_td - self.short_td_frozen
        else:
            pos_available = self.long_pos - self.long_pos_frozen
            td_available = self.long_td - self.long_td_frozen

        if req.volume > pos_available:
            return []
        elif req.volume <= td_available:
            req_td = copy(req)
            req_td.offset = Offset.CLOSETODAY
            return [req_td]
        else:
            req_list = []

            if td_available > 0:
                req_td = copy(req)
                req_td.offset = Offset.CLOSETODAY
                req_td.volume = td_available
                req_list.append(req_td)

            req_yd = copy(req)
            req_yd.offset = Offset.CLOSEYESTERDAY
            req_yd.volume = req.volume - td_available
            req_list.append(req_yd)

            return req_list

    def convert_order_request_lock(self, req: OrderRequest):
        """"""
        if req.direction == Direction.LONG:
            td_volume = self.short_td
            yd_available = self.short_yd - self.short_yd_frozen
        else:
            td_volume = self.long_td
            yd_available = self.long_yd - self.long_yd_frozen

        # If there is td_volume, we can only lock position
        if td_volume:
            req_open = copy(req)
            req_open.offset = Offset.OPEN
            return [req_open]
        # If no td_volume, we close opposite yd position first
        # then open new position
        else:
            open_volume = max(0, req.volume - yd_available)
            req_list = []

            if yd_available:
                req_yd = copy(req)
                if self.exchange in [Exchange.SHFE, Exchange.INE]:
                    req_yd.offset = Offset.CLOSEYESTERDAY
                else:
                    req_yd.offset = Offset.CLOSE
                req_list.append(req_yd)

            if open_volume:
                req_open = copy(req)
                req_open.offset = Offset.OPEN
                req_open.volume = open_volume
                req_list.append(req_open)

            return req_list


彻底解决K线生成器的问题——一个日内对齐等交易时长的K线生成器

先厘清大思路,后面逐步完成。

1.搞量化交易,一个好用的K线生成器是最基本的要求!

vnpy系统自带了一个BarGenerator,它可以帮助我们生成1分钟,n分钟,n小时,日周期的K线,也叫bar。可是除了1分钟比较完美之外,有很多问题。它在读取历史数据、回测的时候多K线的处理和实盘却有不一样的效果。具体的问题我已经在解决vnpy 2.9.0版本的BarGenerator产生30分钟Bar的错误!这个帖子中做过尝试,但也不是很成功。因为系统的BarGenerator靠时间窗口与1分钟bar的时间分钟关系来决定是否该新建和结束一个bar,这个有问题。于是我改用对1分钟bar进行计数来决定是否该新建和结束一个bar,这也是有不可靠的问题,遇到行情比较清淡的时候,可能有的分钟就没有1分钟bar产生,这是完全有可能的!
K线几乎是绝大部分交易策略分析的基础,除非你从事的是极高频交易,否则你就得用它。可是如果你连生成一个稳健可靠的K线都不能够保证,那么运行在K线基础上的指标及由此产生的交易信号就无从谈起,K线错了,它们就是错误的,以此为依据做出点交易指令有可能是南辕北辙,所以必须解决它!

2.日内对齐等交易时长K线需要什么参数

2.1 日内对齐等交易时长K线是最常用的

K线不是交易所发布的,它有很多种产生机制。其对齐方式、表现形式多种多样。关于K线的分类本人在以往的帖子中做出过比较详细的说明,有兴趣的读者可以去我以往的帖子中查看,这里就不再赘述。
市面上的绝大部分软件如通达信、大智慧、文华财经等软件,除非用户特别设定,他们最常提供给用户的K线多是日内对齐等交易时长K线。常用是一定是有道理的,因为它们已经为广大用户和投资者所接受。

2.2 日内对齐等交易时长K线需要什么参数

1)什么是日内对齐等交易时长K线?
它具有这些特点:以每日开盘为起点,每根K线都包含相同交易时间的数据,跳过中间的休市时间,直至当前交易日的收盘,收盘不足n分钟也就是K线。实盘中,每日的第一个n分钟K线含集合竞价的那个tick数据。
2)为什么这种K线能够被普遍接受?
为它尽可能地保证一个交易日内的所有K线所表达的意义内容上是一致的,它们包含相等的交易时长。这非常重要,因为你把一个5分钟时长的K线与一个30分钟时长的K线放在一起谈论是没有意义的。但是如果为了保证K线在交易时长上的一致性,让n分钟K线跨日的话也是不太合理,因为这跨日,跨周末时间太长,这中间会发生什么意外事情,可能会产生出非常巨大的幅度大K线,掩盖了隔日跳空的行情变化,这对解读行情是不利的。当然n日的K线日跨日的,但是它是n个交易日的K线融合而成的,不过其融合的每个日K线也是对齐各自的日开盘的。
另外日内对齐等交易时长K线还有一个好处,那就是你以任何之前的时间为起点,在读取历史数据重新生成该日的n分钟K线的时候,得到的改日的K线是一致的。举个例子,如果我们的CTA策略在init()中常常是这么一句:

self.load_bar(20)  # 先加载20日历史1分钟数据

这么简单的一句,包含着很多你意识不到的变化——你今天运行策略和明天运行你的策略,其中的历史数据的范围发生了变化,也就是说加载数据的起点变了。如果我们合成的K线的对齐方式不采用日内对齐的话,而采用对齐加载时间起点的话,你今天、明天加载出来之前的某日的K线就可能完全是不同的。而采用日内对齐等交易时长的K线则不存在这个问题。

3)需要知道合约的交易时间段
既然要对齐每日开盘,还有跳过各个休市时间,还要知道收市时间,那么我们就知道生成这种K线必须知道其所表达合约或对象的交易时间段,交易时间段中包含了这些信息,不知道这些信息,BarGenerator就不知道如何生成这种bar。这是必须的!

3. 如何获取合约的交易时间段

3.1 vnpy系统和CTP接口找不到交易时间段信息

目前vnpy系统中的是没有合约的交易时间段的。到哪里获取合约的交易时间段的呢?
1) 它与合约相关,应该到保存合约的数据类ContractData中去找,没有找到。
2) 是否可以提供接口,从交易所获得,这个也是比较基础的数据。于是到CTP接口中(我使用的是CTP接口,您也许不一样) ,在最新版本的CTP接口文档中也没有找到任何与交易时间段相关的信息,绝望!

3.2 有两种方法可以得到交易时间段信息

  1. 米筐数据接口中有,其中有个得到市场中所有合约的函数all_instruments(),它的返回值中就包含交易时间段信息trading_hours,还有另外一些如get_trading_hours()也可以直接获得这些交易时间段信息,好!
  2. 实在没有的话,咱们手工创建一个文件,按照一定格式,把我们需要创建K线的合约准备好交易时间段信息,这也是可行的。

3.3 直接从米筐数据接口获取交易时间段信息的问题

  1. 你必须购买过米筐数,否则无从获得
  2. 直接使用从米筐数据接口获取交易时间段信息,会有效率问题。本人试过,运行get_trading_hours()常会用卡顿,其目前是在65000多个合约中为你寻找一个合约的交易时间段信息,在K线合成这种使用非常频繁的地方,效率是必须的!况且米筐对每个用户的流量也是有限制的,如果超过了也是会限流的!
  3. 对于有些米筐没有的品种难道我们就不能使用K线了吗?

解决方法:

  1. 基于以上这些原因,采用解耦具体数据提供商的方法会更好!把这些信息保存到一个文件或者数据库中,只要您能够办法获得这些信息,按照规定的格式存储,哪怕是手工输入也是可以的。
  2. 新的K线生成器只需要对规定格式的交易时间段信息进行处理,按照一定的规则就可以进行K线生成了!

3.4 从米筐获取交易时间段信息

3.4.1 扩展DataFeed

打开vnpy.trader.datafeed.py文件为Datafeed的基类BaseDatafeed扩展下面的接口

class BaseDatafeed(ABC):
    """
    Abstract datafeed class for connecting to different datafeed.
    """

    def init(self) -> bool:
        """
        Initialize datafeed service connection.
        """
        pass

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """
        pass

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        pass

    def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data.
        """
        pass

    def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
        """
        Query history tick data.
        """
        pass

其中的trading_hours.json文件我会在后面的文章中做详细的介绍。有了它我们才能展开其他的设计。

3.4.2 扩展RqdataDataFeed

在vnpy_rqdata\rqdata_datafeed.py中增加下面的代码

  • 引用部分增加:
from datetime import timedelta,date # hxxjava add
  • 在class RqdataDatafeed(BaseDatafeed)中增加下面的代码 :

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """

        if not self.inited:
            self.init()

        if not self.inited:
            return False

        ths_dict = load_json(self.trading_hours_file)

        # instruments = all_instruments(type=['Future','Stock','Index','Spot'])

        trade_hours = {}

        for stype in ['Future','Stock','Index','Fund','Spot']:
            instruments = all_instruments(type=[stype])
            # print(f"{stype} instruments count={len(instruments)}")

            for idx,inst in instruments.iterrows():
                # 获取每个最新发布的合约的建议时间段
                if ('trading_hours' not in inst) or not(isinstance(inst.trading_hours,str)):
                    # 跳过没有交易时间段或者交易时间段无效的合约
                    continue

                inst_name = inst.trading_code if stype == 'Future' else inst.order_book_id 
                inst_name = inst_name.upper() 
                if inst_name.find('.') < 0:
                    inst_name += '.' + inst.exchange

                if inst_name not in ths_dict:
                    str_trading_hours = inst.trading_hours

                    # 把'01-'或'31-'者替换成'00-'或'30-'
                    suffix_pair = [('1-','0-'),('6-','5-')]
                    for s1,s2 in suffix_pair:
                        str_trading_hours = str_trading_hours.replace(s1,s2)

                    # 如果原来没有,提取出来
                    trade_hours[inst_name] = {"name": inst.symbol,"trading_hours": str_trading_hours}

        # print(f"trade_hours old count {len(ths_dict)},append count={len(trade_hours)}")
        if trade_hours:
            ths_dict.update(trade_hours)
            save_json(self.trading_hours_file,ths_dict)

        return True

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        json_file = get_file_path(self.trading_hours_file)
        if not json_file.exists():
            return {}
        else:
            return load_json(self.trading_hours_file)

3.4.3 为main_engine安装一个可以获取交易时间段信息的接口

在vnpy\trader\engine.py中:

  • 该文件的引用部分:
from .datafeed import get_datafeed                  # hxxjava add
  • 为MainEngine类增加下面函数

    def get_trading_hours(self,vt_symbol:str) -> str:   # hxxjava add
        """ get vt_symbol's trading hours """
        ths = self.all_trading_hours.get(vt_symbol.upper(),"")       
        return ths["trading_hours"] if ths else ""

为什么在MainEngine类增加可以获取交易时间段信息的接口?

因为无论你运行vnpy中的哪个app,你都会启动main_engine,无需绕弯子就可以得到这些信息,而我们的用户策略中都包含各自策略的引擎,这样就方便获取交易时间段信息。

如CTA策略中包含cta_engine,而cta_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取您交易品种的交易时间段信息:

       trading_hours = self.cta_engine.main_engine.get_trading_hours(selt.vt_symbol)

如PortFolioStrategy策略中包含strategy_engine,而strategy_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取多个交易品种的交易时间段信息:

       trading_hours_list = [self.cta_engine.main_engine.get_trading_hours(vt_symbol) for vt_symbol in self.vt_symbols]

是不是很方便呢?

3.4.4 在系统的投研中执行更新所有品种(含期货、股票、指数和基金)的交易时间段

vnpy 3.0的启动界面中已经集成了一个叫“投研”的功能,其实它是jupyter lab,启动之后输入下面的代码:

# 测试update_all_trading_hours()函数和load_all_trading_hours()
from vnpy.trader.datafeed import get_datafeed

df = get_datafeed()
df.init()

df.update_all_trading_hours()   # 更新所有合约的交易时间段到本地文件中

ths = df.load_all_trading_hours() # 从本地文件中读取所有合约的交易时间段

当然您可以在vnpy的trader中主界面的菜单中增加一项,方便您在需要的时候执行下面语句。不过这个更新交易时间段的功能并不需要频繁执行,手动也就够了,记得就好。

3.4.5 你还可以手工打开trading_hours.json,直接输入

经过上面步骤3.4.4,您就在本地得到了一个trading_hours.json文件,该文件在您的用户目录下的.vntrader\中,其内容如下:

{
    "A0303.DCE": {
        "name": "豆一0303",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0305.DCE": {
        "name": "豆一0305",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0307.DCE": {
        "name": "豆一0307",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0309.DCE": {
        "name": "豆一0309",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0311.DCE": {
        "name": "豆一0311",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0401.DCE": {
        "name": "豆一0401",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
   ... ...
}

观察其格式,在你没有米筐数据接口或者这里没有的合约,您也可以手动输入合约交易时间段信息。

按照程序中算法,这个文件文件中一共包含约16500多个合约的交易时间段信息。可以覆盖国内金融市场几乎全部都产品,但是不包括金融二次衍生品期权。
为什么没有期权交易时间段信息,因为不需要。期权合约有其对应的标的物,从其名称和编号就可以解析出来。期权合约的交易时间段其和标的物的交易时间段是完全相同的,因此不需要保存到该文件中。



【Elite量化策略实验室】RSI多头短线择时策略

发布于vn.py社区公众号【vnpy-community】
 
原文作者: 丛子龙 | 发布时间:2023-10-13
 
RSI相对强度指数是技术分析中常用的指标之一,由J. Welles Wilder Jr.在其1978年的著作《技术交易系统的新概念》中开发并引入技术分析中使用。RSI衡量价格变动的速度和幅度,该指标绘制在0到100的范围内。

许多技术分析类的书籍中常常见到将RSI用于均值回归交易,为人熟知的用法之一是在RSI超过70时卖出资产,当RSI跌破30时买入资产。然而,RSI也可以用作动量趋势指标,比如在上升趋势中,RSI通常在40到80之间波动,在下降趋势中在20到60之间波动。

本篇文章将会介绍三套围绕RSI构建的多头短线择时策略(策略思路来源于【QuantifiedStrategies】网站),分别是:

1. RSI经典策略

2. RSI区域动量策略

3. RSI-IBS策略

在文章的结尾,我们将会组合上述三个策略中的核心信号,构建一个新的RSI多信号集成策略,并在SPY(标普500ETF基金)和IF(沪深300股指期货)上进行回测。

 

RSI经典策略

 

回测绩效

由于【QuantifiedStrategies】网站文章中主要使用了SPY的日线数据进行回测,以下回测结果也基于同样数据:

description

 

基本信息

description

 

核心逻辑

策略的核心思想是在市场情绪低迷时,买入以寻求市场反弹,然后在价格触及昨日高点时卖出:

  1. 当市场上的RSI指标低于设定阈值时,即被认为处于超卖状态;
  2. 计算每次交易的头寸大小;
  3. 如果当前无持仓且市场满足买入条件,则执行超价买入操作;
  4. 如果当前持有多头仓位,则挂出卖单,价格为昨日高点,这里不使用超价卖出。

 

初始化策略

class RsiStrategy(CtaTemplate):
    """经典RSI策略"""
    # 计算RSI指标的窗口
    rsi_window: int = 2

    # RSI低阈值
    rsi_lower: int = 15

    # 持仓周期限制
    max_holding: int = 9

    # 风险资金
    risk_capital: int = 1_000_000

    parameters = [
        "rsi_window",
        "rsi_lower",
        "max_holding",
        "risk_capital",
    ]
    variables = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()

        # 加载足够的历史数据来计算指标
        self.load_bar(60)

 

指标计算&交易执行

def on_bar(self, bar: BarData):
        """
        K线数据推送
        """

        # 撤销之前发出的委托
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        # 计算RSI指标
        rsi_value = am.rsi(self.rsi_window)

        # 保存昨日高点
        prev_high = am.high[-1]

        # 判断是否要进行交易
        long_signal = rsi_value <= self.rsi_lower

        # 计算每次交易的头寸
        self.trading_size = int(self.risk_capital / am.close[-1] / 100) * 100

        # 如果无持仓,且满足条件,则直接开仓
        if self.pos == 0 and long_signal:
            self.buy(bar.close_price * 1.05, self.trading_size)

        # 如果持仓,且满足条件,则直接平仓
        if self.pos > 0:
            self.sell(prev_high, self.pos)

        # 推送UI更新
        self.put_event()

 

RSI区域动量策略

 

回测绩效

description

本策略的独特之处在于其较低的回撤率(-7.99%),如此低的回撤率也造就了漂亮的收益回撤比:17.62。

本策略一旦开仓就通常保持在上升轨道上,这意味着它在大部分时间内不会暴露于市场风险之下。这种特性使得这个策略在不稳定市场中具有较强的抗跌能力,有助于保护利润。

由于在大部分时间内没有仓位,该策略可以与其他策略相互配合提供额外收益。低回撤和稳定的增长趋势为其提供了与其他更高风险策略相互协同的机会,以实现更好的综合投资表现。

 

基本信息

description

 

核心逻辑

该策略包括两个指标:

  1. RSI多头范围:RSI过去N天内在40到100之间波动。

  2. RSI多头动量:RSI的极值高点在N天内大于70。

本次回测将使用100天的回望窗口和14天的RSI,这意味着为了触发RSI多头范围的信号,RSI必须在过去的100天内在40到100之间波动。

有了这个理念,交易逻辑非常简单:

  1. 当RSI多头范围和多头动量条件都成立时,开仓。

  2. 当RSI多头范围和多头动量条件都不再成立时,平仓。

 

初始化策略

class RsiRangeMomStrategy(CtaTemplate):
    """RSI区域动量策略"""
    # 计算RSI指标的窗口
    rsi_window: int = 14

    # RSI低阈值
    rsi_lower: int = 40

    # RSI高阈值
    rsi_upper: int = 100

    # RSI极值阈值
    rsi_highest: int = 70

    # 风险资金
    risk_capital: int = 1_000_000

    parameters = [
        "rsi_window",
        "rsi_lower",
        "rsi_upper",
        "rsi_highest",
        "risk_capital",
    ]
    variables = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()

        # 加载足够的历史数据来计算指标
        self.load_bar(150)

 

信号指标计算

def on_bar(self, bar: BarData):
        # 撤销所有挂单
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        # 计算rsi的值,并返回一个【rsi_window】长度的数组
        rsi_array: np.ndarray = am.rsi(self.rsi_window, array=True)

        # 计算该rsi数组的平均值,使用【np.nanmean】的原因是因为返回的数组中包含NaN值
        mean_value: float = np.nanmean(rsi_array)

        # 将NaN值使用【mean_value】填充
        rsi_array: np.ndarray = np.nan_to_num(rsi_array, nan=mean_value)

 

目标交易执行

# 使用历史rsi值计算趋势是否处于牛市区间
        # 判断标准为本段历史中所有rsi的值是否都在【rsi_lower】与【rsi_upper】之间
        bull_range_signal: bool = (np.all(rsi_array > self.rsi_lower) and
                                   np.all(rsi_array < self.rsi_upper))

        # 判断本段历史中是否存在较强的rsi值,只要有一个超过设定的【rsi_highest】即成立
        bull_mom_signal: bool = np.any(rsi_array > self.rsi_highest)

        # 计算开仓数量
        trading_size: int = (int(self.risk_capital / bar.close_price / 100)
                             * 100)

        # 判断开仓信号
        if self.pos == 0:
            # 如果牛市区间以及极值信号都出现,满仓入场
            if bull_range_signal and bull_mom_signal:
                self.buy(bar.close_price*1.05, trading_size)

        # 判断平仓信号
        if self.pos > 0:
            # 如果信号不再成立,清仓
            if not (bull_range_signal or bull_mom_signal):
                self.sell(bar.close_price*0.95, self.pos)

        # 推送UI更新
        self.put_event()

 

RSI-IBS 策略

 

回测绩效

description

 

基本信息

description

 

核心逻辑

策略中用到的IBS(内部K线强度),指标计算公式如下:

(Close - Low) / (High - Low)

IBS指标的波动范围从0到1,测量收盘价相对于日内高低点的位置,较低的值被认为是看涨的,而较高的值则是短期看跌的,IBS的基本假设是市场具有均值回归的特性。

策略交易逻辑为:

  1. 计算RSI和IBS指标;
  2. 根据计算的RSI和IBS指标,判断是否满足开仓条件;
  3. 如果没有持仓且满足开仓条件,执行买入。如果已有多头仓位且当日收盘价高于昨日收盘价,则执行卖出。

 

初始化策略

class RsiIbsStrategyOG(CtaTemplate):
    """RSI-IBS策略"""

    # rsi指标窗口
    rsi_window: int = 21

    # 入场rsi指标阈值
    rsi_entry: int = 45

    # 入场ibs指标阈值
    ibs_entry: float = 0.25

    # 风险资金
    risk_capital: int = 1_000_000

    parameters = [
        "rsi_window",
        "rsi_entry",
        "ibs_entry",
    ]
    variables = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.prev_close = 0
        self.load_bar(60)

 

指标交易计算&交易执行

def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        # 撤销之前发出的委托
        self.cancel_all()

        # 对am更新bar数据
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        # 计算开仓手数
        trading_size = int(self.risk_capital / bar.close_price / 100) * 100

        # 计算rsi指标数值
        rsi_value = am.rsi(self.rsi_window)

        # 计算ibs指标数值
        ibs_value = ((bar.close_price - bar.low_price) /
                     (bar.high_price - bar.low_price))

        # 分别计算开仓信号
        cond_1 = rsi_value < self.rsi_entry
        cond_2 = ibs_value < self.ibs_entry

        # 汇总合成信号
        long_signal = cond_1 and cond_2

        # 执行开仓操作
        if self.pos == 0 and long_signal:
            self.buy(bar.close_price * 1.05, trading_size)

        # 计算平仓信号,并执行平仓操作
        if self.pos > 0:
            if bar.close_price > self.prev_close:
                self.sell(bar.close_price * 0.95, self.pos)

        # 记录当今bar的收盘价
        self.prev_close = bar.close_price

        # 推送UI更新
        self.put_event()

 

RSI多信号集成策略

 

前文中的三套策略虽然都围绕RSI指标开发,但由于核心思路的区别,其回测绩效曲线还是体现出了较低的相关性。那么下一步的研究方向,就是把三套策略中的交易信号提取出来后,集成组合成为一个新的策略,看看能否达到更优秀的整体绩效。

为了实现信号的集成组合,需要对之前的策略代码进行调整,拆分成为策略信号和交易执行两块部分,具体逻辑流程看了下图应该会有一个更加清晰直观的理解:

description

当任一策略信号给出True值即入场做多,当所有策略信号都返回False值或达到止损目标时平仓离场。

 

回测绩效

description

 

代码实现

信号生成部分被封装在独立的信号类中,分别是:

1. RsiSignal

2. RsiRangeMomSignal

3. RsiIbsSignal

实现这些信号的方式并没有什么特别之处,单单是将前述三个策略的信号生成部分截取出来封装成一个可以返回布尔值(信号)的函数即可。

RsiSignal

class RsiSignal:
    def __init__(
        self,
        rsi_window: int = 2,
        rsi_lower: int = 15
    ):
        # 计算RSI指标的窗口
        self.rsi_window: int = rsi_window

        # RSI低阈值
        self.rsi_lower: int = rsi_lower

    def calculate_signal(self, am: ArrayManager) -> bool:
        # 计算RSI指标
        rsi_value = am.rsi(self.rsi_window)

        # 判断是否要进行交易
        return rsi_value <= self.rsi_lower

RsiRangeMomSignal

class RsiRangeMomSignal:
    def __init__(
        self,
        rsi_window: int = 14,
        rsi_lower: int = 40,
        rsi_upper: int = 100,
        rsi_highest: int = 70
    ) -> None:
        # 计算RSI指标的窗口
        self.rsi_window: int = rsi_window

        # RSI低阈值
        self.rsi_lower: int = rsi_lower

        # RSI高阈值
        self.rsi_upper: int = rsi_upper

        # RSI极值阈值
        self.rsi_highest: int = rsi_highest

    def calculate_signal(self, am: ArrayManager) -> bool:
        # 计算rsi的值,并返回一个【rsi_window】长度的数组
        rsi_array: np.ndarray = am.rsi(self.rsi_window, array=True)

        # 计算该rsi数组的平均值,使用【np.nanmean】的原因是因为返回的数组中包含NaN值
        mean_value: float = np.nanmean(rsi_array)

        # 将NaN值使用【mean_value】填充
        rsi_array: np.ndarray = np.nan_to_num(rsi_array, nan=mean_value)

        # 使用历史rsi值计算趋势是否处于牛市区间
        # 判断标准为本段历史中所有rsi的值是否都在【rsi_lower】与【rsi_upper】之间
        bull_range_signal: bool = (np.all(rsi_array > self.rsi_lower) and
                                   np.all(rsi_array < self.rsi_upper))

        # 判断本段历史中是否存在较强的rsi值,只要有一个超过设定的【rsi_highest】即成立
        bull_mom_signal: bool = np.any(rsi_array > self.rsi_highest)

        return bull_range_signal and bull_mom_signal

RsiIbsSignal

class RsiIbsSignal:
    def __init__(
        self,
        rsi_window: int = 21,
        rsi_entry: int = 45,
        ibs_entry: float = 0.25
    ):
        # rsi指标窗口
        self.rsi_window: int = rsi_window

        # 入场rsi指标阈值
        self.rsi_entry: int = rsi_entry

        # 入场ibs指标阈值
        self.ibs_entry: float = ibs_entry

    def calculate_signal(self, am: ArrayManager) -> bool:
        # 计算rsi指标数值
        rsi_value = am.rsi(self.rsi_window)

        # 计算ibs指标数值
        ibs_value = ((am.close[-1] - am.low[-1]) /
                     (am.high[-1] - am.low[-1]))

        # 计算开仓信号
        cond_1 = rsi_value < self.rsi_entry
        cond_2 = ibs_value < self.ibs_entry

        return cond_1 and cond_2

前文已经详细讲解过各个信号的生成逻辑,这里便不再赘述。

 

信号合成&交易执行

在主策略的【on_init】函数下,将上述三个信号类实例化为成员对象,并分别传入量价数据缓存容器(通过ArrayManager类的封装):

class RsiEnsembleStrategy(CtaTemplate):
    """"""
    author = "Tony"

    rrms_rsi_window: int = 14
    rrms_rsi_lower: int = 40
    rrms_rsi_upper: int = 100
    rrms_rsi_highest: int = 70

    ris_rsi_window: int = 21
    ris_rsi_entry: int = 45
    ris_ibs_entry: float = 0.25

    rs_rsi_window: int = 2
    rs_rsi_lower: int = 15

    # 风险资金
    risk_capital: int = 1_000_000

    parameters = [
        "rrms_rsi_window",
        "rrms_rsi_lower",
        "rrms_rsi_upper",
        "rrms_rsi_highest",
        "ris_rsi_window",
        "ris_rsi_entry",
        "ris_ibs_entry",
        "rs_rsi_window",
        "rs_rsi_lower",
    ]
    variables = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()

        # 初始化信号生成器实例
        self.rrms = RsiRangeMomSignal(
            self.rrms_rsi_window,
            self.rrms_rsi_lower,
            self.rrms_rsi_upper,
            self.rrms_rsi_highest
            )
        self.ris = RsiIbsSignal(
            self.ris_rsi_window,
            self.ris_rsi_entry,
            self.ris_ibs_entry
        )
        self.rs = RsiSignal(
            self.rs_rsi_window,
            self.rs_rsi_lower,
        )

        self.load_bar(150)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        # 撤销之前发出的委托
        self.cancel_all()

        # 对am更新bar数据
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        # 计算开仓手数
        trading_size = int(self.risk_capital / bar.close_price / 100) * 100

        # 传入am,计算三个信号的值
        rrms_signal = self.rrms.calculate_signal(am)
        ris_signal = self.ris.calculate_signal(am)
        rs_signal = self.rs.calculate_signal(am)

        # 如果任一信号成立则进行开仓
        if self.pos == 0:
            if rrms_signal or ris_signal or rs_signal:
                self.buy(bar.close_price*1.05, trading_size)

        # 如果三个信号都不成立则进行平仓
        if self.pos != 0:
            if not rrms_signal and not ris_signal and not rs_signal:
                self.sell(bar.close_price*0.95, abs(self.pos))

            # 移动止损逻辑
            elif self.pos > 0:
                self.sell(bar.close_price*0.95, abs(self.pos), stop=True)

        # 推送UI更新
        self.put_event()

 

回测数据和参数

 

该策略历史回测需要用到的SPY历史数据,可以下载zip文件后解压,找到其中load_bar_data.py脚本文件,然后用Python运行即可自动导入数据库。

具体的回测参数配置如下:

description

 

股指IF回测绩效

 

本文选择的回测数据时间段中SPY整体处于长周期大牛市,因此多头逻辑的交易策略可能天然具有明显优势(毕竟简单买入做多就能赚钱),所以这里选择使用IF股指期货来作为策略有效性的交叉验证:

description

description

可以看出,RSI多信号集成策略在IF上的绩效也是相当可观的,虽然自2021年起策略的有效性变差了许多,但是仍然将回撤保持在了一个可控的范围,这也体现了CTA策略中多信号组合的优势。

 

关于向量化计算

 

本文代码中多次运用了numpy库中提供的向量化计算函数,例如【np.nan】、【np.mean】、【np.any】、【np.all】等。向量化计算是一种使用数组或矢量操作来处理数据的方法,它具有性能提升、代码简洁、可读性高、跨平台性,适用于大规模数据等优势。

完整策略代码和回测数据文件,可以通过【VeighNa进阶用户交流群】获取:

description

 

免责声明

文章中的信息或观点仅供参考,作者不对其准确性或完整性做出任何保证。读者应以其独立判断做出投资决策,作者不对因使用本报告的内容而引致的损失承担任何责任。

 



新课上线:《精研期权价差策略》

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2023-10-11

 

VeighNa全实战进阶期权系列的第三阶段《精研期权价差策略》正式上线!

这套课程差不多筹划了两年时间,核心原因在于期权策略回测真的很复杂(对比CTA策略来说要复杂得多),列举几个关键点:

  • 期权在每个到期月份上存在许多不同行权价的合约(且交易所还会动态加挂),无法使用类似期货连续合约的方式直接进行回测,如何解决长周期回测中每天可交易期权合约范围变化的问题;
  • 期权策略中很大比例的买卖决策需要基于截面信号(而不只是时序信号),经常每个交易日需要加载上百个期权合约的历史分钟数据进行对齐回测,如何保证性能让回测速度不至于慢得无法接受(比如跑1年回测要5个小时);
  • 期权策略在长周期回测中,需要根据当前标的合约价格的位置,以及可选期权链的剩余到期时间来确定具体的交易合约,如何构建一种相对坐标查询体系来满足策略中的期权价差合约定位需求。

为了解决这些问题我们开发了OptionStrategy期权策略模块,但底层需要依赖于【VeighNa 机构版】的服务端架构,对于个人交易员或者小型团队来说运维太过复杂。

截止今年三季度终于基本完成了OptionStrategy在Elite版上的移植工作,所以《精研期权价差策略》课程将会使用【VeighNa Elite版 仿真模拟】来讲解,带着大家由浅入深研究期权价差策略的开发、回测、优化的全流程,同时基于策略历史回测绩效来精研期权价差交易中的各种细节:

description

目前【VeighNa Elite版 仿真模拟】已经可以直接在官网下载,安装完成后使用社区论坛的账号密码登录即可(和VeighNa Station一样),仿真交易目前支持上期技术的SimNow环境,后续也计划接入更多其他仿真环境。

课程目前一共计划40节,内容大纲如下(黑体加粗课时为代码实践内容):

description

这门课程适合的人群:

  • 完成了《实战进阶课程-深入期权定价模型》的学习,想要继续深入进阶期权量化策略;
  • 了解过期权价差组合(Option Spread)交易,希望结合量化开发和数据回测来打造自己的策略实战体系;
  • 对金融和量化感兴趣,希望未来在量化领域获得工作机会的在校学生;
  • 其他对课程内容感兴趣的人士。

课程当前已经上线,价格499元,前100名购买享受9折优惠(449元)。直接在【VeighNa开源量化】公众号(vnpy-community)里就能购买和观看(点击底部菜单栏的【进阶资料】进入)。推荐使用PC微信打开,视频分辨率更加清晰。

 

【精研期权价差策略 - 快速传送门】
 

本线上课程包含在【Elite会员】免费学习权益内。

 


统计

主题
8803
帖子
34193
已注册用户
43888
最新用户
在线用户
84
在线来宾用户
4264
© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】