VeighNa量化社区
你的开源社区量化交易平台
KeKe's Avatar
Member
离线
187 帖子
声望: 51

恰巧我有做价差交易,就补充一下: 先对齐时间戳,再合成价差

楼主的做法,是有tick数据就合成价差,这会引入脏数据。价差交易的标准做法,是通过回望过去n个spread data,得到均线和标准差,构建上下轨(可以理解为置信区间),但它突破的时候做价差回归交易。当缓存的spread data数组不准时候,交易信号自然有错。

假设t1时刻同时来2个tick,时间上先A,后B。那么只需要收到B数据时候,在合成价差就可以了。因为收到a时刻,合成是spread是B(t+0) - A(t + 1),正确的价差永远是对齐时间戳后,用最新数据相减得到的。

如何对齐时间戳,这里到可以讲一下:
1) 先判断行情数据的顺序,或者是否有序:

  • 可以用机器收一篮子行情数据,在共享内存里面打tsc(cpu原子钟计数)。先设置tsc计数是恒定的,不会因为超频到5GHz或者基频3GHz不同而有所区别。我这里是3个tsc=1纳秒。

  • 以上期所为例子,行情的顺序就按照字母 A -> Z。例如先收到AL,AU,BC,RB,,,ZN。在相同品种里面,再按照数字从小到大,即先收到AL2209,再AL2210,AL2211。上期为了保证时间的统一性,切片会同赋予同样的时间,但这个切片时间不等于真实行情时间,而且会有2毫秒的延迟,并且是累计的。这里猜测是交易所虽然每隔500毫秒切,但是切片到发送,里面一些数据转换或者io消耗达2毫秒。所以观察到开盘机器对时后,行情是延迟累计增加,到后面越来越大。

2)确定价差对哪个品种是最后收到(跨期一般是远月),以那品种为基准合成价差:

  • 这仅仅是理论做法,在实践中会遇到蛮多坑的。

  • 以当前2202年8月为例子,跨期的话是AL2209(主力)&AL2210(次主力), AG2209(次)&AG2212(主)。我们知道用远月对齐,并且主力的tick数量,要远多于次主力的tick。

  • 这样,由于AL2210远月而且是次主力,那么合成价差后,AL的价差数据 < AG。

  • 这更好从硬件上面去处理,如使用FPGA。因为FPGA行情网卡收的是交易所行情镜像,它第一条数据是全息盘口数据,之后的都是增量数据。由于数据量比较小,相同的数据包会有大量的信息。这样,只要硬件解完码后,共享内存一刷新,都是最新的数据。就不从这么麻烦,例如AL2209解包,共享内存刷新一次,策略订阅的on_tick触发一次,然后AL2210解包,共享内存刷新一次,策略on_tick再触发。

  • 而正常的行情api的话,以凌云为例(类似ctp的md api),它是在柜台层面解包后,按照品种一个小包一个小包的发出去。相同数据,数据包拆得越小,处理起来越麻烦。

  • 如果没有条件使用fpga的话,只能在别的逻辑上面修正。

不管是研究套利策略,还是多因子策略,都需要多品种的历史数据,所以下面介绍一下,如何调用vnpy的数据下载模块,来下载全市场的期货数据。

 

批量下载

 

1)设置合约品种

 

首先,我们要先生成一个字典,来指定需要下载的数据,关键字段有3个:

 

  • 交易所代号:上期所-> SHFE
  • 合约代号: 螺纹钢-> RB
  • 合约品种类型: 指数合约 -> 99

 

这样,在RQData中,我们要下载螺纹钢指数合约的历史数据,需要转成的代号为RB99.SHFE。
 

然后,由于是全市场行情的数据,所以字典的数据结构如下:key是交易所,value是列表,里面包含各种期货品种,这样,只要在遍历一下这个字典,就可以得到所有,如RB99.SHFE这样结构的字符串。

 

symbols = {
    "SHFE": ["CU", "AL", "ZN", "PB", "NI", "SN", "AU", "AG", "RB", "WR", "HC", "SS", "BU", "RU", "NR", "SP", "SC", "LU", "FU"],
    "DCE": ["C", "CS", "A", "B", "M", "Y", "P", "FB","BB", "JD", "RR", "L", "V", "PP", "J", "JM", "I", "EG", "EB", "PG"],
    "CZCE": ["SR", "CF", "CY", "PM","WH", "RI", "LR", "AP","JR","OI", "RS", "RM", "TA", "MA", "FG", "SF", "ZC", "SM", "UR", "SA", "CL"],
    "CFFEX": ["IH","IC","IF", "TF","T", "TS"]
}
​
symbol_type = "99"

 
2) 设置下载时间
 

我们只需要设置下载的开始和结束时间即可,需要注意的是,vnpy数据下载模块的入参是datetime.datetime格式,所以,要做到格式的一致,代码如下:
 

from datetime import datetime
start_date = datetime(2005,1,1)
end_date = datetime(2020,9,10)

 
3)批量下载全市场数据

 
批量下载数据,并不难,其运作步骤如下:

 

  1. 遍历symbols字典,
  2. 生成不同的HistoryRequest,
  3. 调用数据下载模块rqdata_client.query_history,得到数据data
  4. 调用数据保存模块database_manager.save_bar_data,把下载好的数据data写入数据库
     
    from vnpy.trader.rqdata import rqdata_client
    from vnpy.trader.database import database_manager
    from vnpy.trader.constant import Exchange, Interval
    from vnpy.trader.object import HistoryRequest
    ​
    def load_data(req):
     data = rqdata_client.query_history(req)
     database_manager.save_bar_data(data)
     print(f"{req.symbol}历史数据下载完成")
    ​
    for exchange, symbols_list in symbols.items():
     for s in symbols_list:
         req = HistoryRequest(
         symbol=s+symbol_type,
         exchange=Exchange(exchange),
         start=start_date,
         interval=Interval.DAILY,
         end=end_date,
         )
         load_data(req=req)
     

写好脚本后,我们运行一下代码,可以看到很快就下完全市场期货的日线数据啦。

description

 

若要下载小时或者分钟级别数据,只要把日线周期(Interval.DAILY)改成对应的小时,或者分钟即可。

 

定时批量更新数据

 

有了历史数据后,自然产生每天定时更新数据的需求

 

1)设置定时器

 
我们希望在收盘后,某个时间点如下午5点启动脚本,来自动下载数据。这本质上是包含了一个父进程和一个子进程。
 

父进程可以是一个永远在运行的python程序,如while循环,然后设置触发条件,如当时间刚好到下午5点就启动子进程下载更新数据,其他时间则睡觉等待。

 
代码如下:

from datetime import datetime, time
from time import sleep
​
current_time = datetime.now().time()
start_time = time(17,0)
​
while True:
  sleep(10)

  if current_time == start_time:
    download_data()

 
2)获取数据库数据

 
更新数据时候,我们要以数据库里面最新的数据的时间点,作为开始时间,而结束时间就是当天。比如,昨天刚好下载完所有市场的数据,那么今天我们只需要下载从昨天到今天的所有数据即可。
 

这样实现起来也不难,步骤如下:
 

1)调用database_manager.get_bar_data_statistics来得到字典格式的数据数据库所有信息

 

data = database_manager.get_bar_data_statistics()

 

2)获取各品种最新数据的时间信息,并且插入到data字典中

 

for d in data:
    newest_bar = database_manager.get_newest_bar_data(
        d["symbol"], Exchange(d["exchange"]), Interval(d["interval"])
    )
    d["end"] = newest_bar.datetime

 

然后我们看看data字典,发现真的包含所有行情的数据,但我们是基于RQData来定期更新信息的,所以要进行筛选,得到国内期货品种(通过交易所来判断)并且是日线级别的数据。
 
description
 

3)基于交易所和K线周期筛选品种,得到新的字典symbols,其中key包含合约代码,交易所,value就是数据库的结束时间,如下图:

 

symbols = {}
for i in data:
    if i["interval"] == "d" and  i["exchange"] in Exchanges:
        vt_symbol = f"{i['symbol']}.{i['exchange']}"
        end = i["end"].date()
        symbols[vt_symbol] = end

description
 
4)设置下载结束时间为当天,基于symbols字典的信息,遍历组合得到HistoryRequest,然后再调用上面定义好的load_data函数下载数据并写入数据库中。

 

end_date = datetime.now().date()
​
for vt_symbol, start_date in symbols.items():
    symbol = vt_symbol.split(".")[0]
    exchange = vt_symbol.split(".")[1]
    req = HistoryRequest(
    symbol=symbol,
    exchange=Exchange(exchange),
    start=start_date,
    interval=Interval.DAILY,
    end=end_date,
    )
    load_data(req=req)

 

下载好之后,我们再获取数据库里面最新的K线时间,发现成功更新到今天了。

 
description

2019年已经进入最后倒计时,vn.py总算是赶上末班车发布了v2.0.9版本:期权交易。

 

本周一,国内三大沪深300指数相关的期权已经同时上线,分别是:

 

  • 中金所股指期权
  • 上交所300ETF期权
  • 深交所300ETF期权

 

2.0.9版本主要更新了围绕期权交易方面的接口和应用,和之前一样,对于使用VN Studio的用户,启动VN Station后,直接点击界面右下角的【更新】按钮就能完成自动更新升级。

 

对于没有安装的用户,请下载VNStudio-2.0.9,体验一键安装的量化交易Python发行版,下载链接:

 

https://download.vnpy.com/vnstudio-2.0.9.exe

 
 

OptionMaster模块

 

OptionMaster是vn.py框架内针对【期权波动率交易】专门设计的上层应用模块。

 

description

 

初始化配置
 

description

 

打开OptionMaster后,会看到上图中的长条形组件。【期权产品】下拉框中会显示当前已连接的交易接口上可选的期权产品组合。

 

注意底层接口必须支持期权合约的交易,这里才会有对应期权产品的显示,否则是不会有任何信息的(比如连接SimNow的CTP测试环境就没有)。

 

description

 

点击【配置】按钮后,弹出上图所示的组合配置对话框,在这里选择要用的期权定价模型,设置期权定价中用到的无风险利率,以及每个期权链定价所用的标的物合约。

 

注意期权链的标的物可以选择留空,此时该期权链在后续的交易中将不会被添加到期权组合中,可以降低一部分定价相关的计算延时。

 
 

期权定价
 

做期权交易,第一步总是离不开正确的定价,针对国内的期权类型,OptionMaster模块中内置了三大定价模型:
 

  • Black-76模型:针对欧式期货期权(股指期权)
  • Black-Scholes模型:针对欧式股票期权(ETF期权)
  • Binomial-Tree模型:针对美式期货期权(商品期权)

 
每个定价模型中,从计算方向来区分,又可以分为:

 

  1. 从定价波动率来计算期权价格:calculate_price相关函数,包括希腊值相关的计算函数calculate_greeks;
  2. 从期权价格来返推隐含波动率:calculate_impv函数,使用Newton-Raphson method(牛顿迭代法)来计算。

 

所有模型中都包含了输入数值的边界检查功能,避免计算出某些异常数值。

 
 
数据模型

 
期权相关的量化交易,和CTA策略等单标的量化交易相比,最大的区别之一就在于会同时交易大量的合约,包括不同行权价的期权、不同行权月份的期权以及标的物期货和股票(线性资产)。

 

同时以上合约之间的价格、成交、持仓等情况变化还会互相影响。在任意时间点结合当前最新行情数据的情况下,期权交易员需要能够实时跟踪整个期权交易组合的波动率曲面和希腊值风险情况。

 

OptionMaster中专门构建了多层嵌套式的立体数据结构,来解决以上多合约数据计算中的复杂性问题:

 

  • UnderlyingData:标的物合约(线性资产)
  • OptionData:期权合约(非线性资产)
  • ChainData:期权链(某一行权月份的所有期权合约)
  • PortfolioData:期权组合(某一品种的所有期权合约,以及标的物合约)
     

当以上数据结构中的任意一个数据发生变化时,会同时触发与之相关的所有计算,保证整体数据结构的一致性。

 
 
T型报价

 

T型报价是期权交易中最常用的行情显示方式,中间白色的一列为行权价,左侧为看涨期权,右侧为看跌期权。

 

description

 
上图中,除了由交易接口层推送过来的原始行情数据外(买卖价格、挂单数量、成交量、持仓量等),还包含了实时计算的买卖价隐含波动率和每个期权的现金希腊值。

 

传统意义上的理论希腊值,直接基于期权定价模型计算,衡量的是当某一变量发生变化时期权价格的变化情况。这种方法从数学的角度容易理解,但是从交易员的实盘使用来说却十分麻烦。
 

假设某个50ETF期权合约的Delta数值,使用Black-Scholes期权定价公式计算出来的结果为0.5482,意味着每当ETF价格上涨1元时,该期权的价格应该上涨0.5482元。

 

而50ETF当前的价格大约是3元,上涨1元是足足超过30%的涨幅,对交易员来说,知道【标的物价格上涨30%期权能赚0.5482元】不能说完全没有参考价值,但效果可能也就跟【每天喝10瓶可乐一定会胖】差不多。

 

所以在实践中,专业期权交易员更多使用的是现金希腊值,衡量的是当某一变量发生1%变化时该期权对应的现金盈亏情况。还是用上面的这个50ETF期权合约,其现金Delta为:

 

0.5484(理论Delta)x 3(标的价格)x 10000 (合约乘数)x 1% = 165
 

这里的165,意味着每当ETF价格上涨1%时,持有1手该期权合约的盈利金额是165元,实盘交易中用来判断某个合约当前的风险水平无疑要方便得多。

 

除了Delta数据外,理论Gamma/Theta/Vega等希腊值也可以同样转化为更加直观的现金希腊值。

 
 
希腊值风险
 

description

 
有了现金希腊值,可以在交易前方便直观的了解某一期权合约的风险水平。但在交易完成后,手头持有一堆期权和标的物持仓时,我们更需要持仓希腊值来跟踪当前整个账户的期权风险暴露:

 

持仓希腊值 = 现金希腊值 x 合约持仓

 

上图中的持仓希腊值的风险,分为单合约、期权链、期权组合三个层次统计,方便交易员结合各种不同类型的波动率交易策略使用(如做多近月波动率、做空远月波动率)。

 
 
快速交易
 

description

 
和VN Trader主界面的交易下单组件使用类似,在上图中的【代码】编辑框中输入合约代码后回车,即可显示该合约的行情盘口数据。或者在T型报价组件上,寻找到好的交易机会后,双击单元格即可完成合约代码的自动填充和行情显示。
 

选好方向、开平,输入价格、数量后,点击【委托】按钮即可立即发出交易委托,点击【全撤】按钮则可一次性将当前处于活动状态的委托(未成交、部分成交)全部撤单。
 

 

期权相关接口

 

2.0.9版本中同样更新完善了和期权相关的交易接口,目前针对期权合约可用的包括:

 

  • CTP期货接口(ctp):可交易股指期权、商品期权
  • 飞马四所接口(femas):可交易股指期权、商品期权
  • CTP期权接口(sopt):可交易ETF期权

 

目前所有的ETF期权程序化交易(包括sopt),都需要曾经向上交所报备过、有程序化交易权限的老账户,有小道消息传闻新账户的报备将在最近放开。

 

 

OptionMaster Pro

 

最后,附上开发中的OptionMaster Pro期权交易系统,采用vn.py框架以及OptionMaster模块组件开发。

 

description

 
除了在核心的定价模型方面进行了Cython低延时优化,也加入了波动率曲面实时跟踪、持仓风险情景3D分析、期权组合Delta自动对冲算法等功能。

 

最后,还有针对期权高频套利设计的电子眼算法引擎(开发中尚未完成):
 

description

 
考虑到几大股指期权刚上线,期权程序化交易方面的监管尚未明朗,OptionMaster Pro目前仅对机构用户提供试用。

 

需要下载软件的用户,请加vn.py机构用户群(QQ群号676499931),本群只对机构用户开放,加群申请中请注明:姓名/机构/部门。

 
 
了解更多知识,请关注vn.py社区公众号。
 

description

TradeBlazer交易开拓者(简称TB),可能是许多投资者开始接触量化时的第一根拐杖,也是国内用户量最大的量化平台之一。

 

但随着时间过去,国内量化投资者编程水平的逐渐提高,越来越多的人开始转向Python这样的开源生态体系。
 

在转换平台的过程中,由于编程语法、数据结构、驱动机制等方面的区别,不少人遇到了各种困难,掉在某些坑里可能几周都爬不出来。

 

本篇文章中我们就来通过一个的经典趋势跟踪策略AtrRsiStrategy,来详细讲解如何一步步将TB策略代码移植到vn.py上的过程。

 

 

ATR-RSI策略

 

完整的ATR-RSI策略逻辑如下:

 

  1. 开仓过滤:当前波动率(ATR)大于历史平均波动率(ATR均值)时,我们认为后续走出趋势的机会变大,只有此时才考虑开仓交易;
  2. 多头开仓:当RSI指标进入超买区域(比如RSI > 66),说明多头力量已取得上风,此时选择立即做多开仓,为了保证能够立刻成交,使用超价的限价委托来下单。
  3. 多头平仓:采用固定百分比的移动止损,在持有多头仓位情况下,跟踪价格曾经到达的最高点,当价格从最高点回落到固定百分比(比如0.8%)的一瞬间立刻平掉多头仓位。
  4. 空头开仓:当RSI指标进入超卖区域(比如RSI < 34),说明空头已取得上风,此时选择立即做空开仓,同样使用超价限价单保证立刻成交。
  5. 空头平仓:同样采用固定百分比的移动止损,当价格当价格从最低点反弹超过固定百分比(比如0.8%)的一瞬间立刻平掉空头仓位。

 

注意点:我们总是假设在当前K线走完计算信号并且发出委托,成交永远发生在下一根K线。即T时刻计算信号,发出委托;最快也要T+1时刻该委托才能成交。这也是下面停止单和限价单撮合的充分条件。

 
 

TB中的策略实现

 

创建RSI指标函数

 

  1. 打开TB,在【TB公式】->【公式管理器】->【公式应用】里面找到RSI指标

description

  1. 打开RSI指标公式应用,复制里面的代码。
  2. 同样在【TB公式】->【新建用户函数】里面创建新的RSI指标函数,这类命名为rsirsi,把代码粘贴到新的函数里面。
  3. 修改函数代码:变量输出类型修改成NumericSeries,删除最后4行的画图函数。
  4. 编译保存后,退出。

 
 
创建ATR-RSI策略

 

  • 在【TB公式】->【新建公式应用】打开新的策略模板。
  • 定义变量输出类型统一为NumericSeries。由于很多信号是基于T计算,在T+1时间成交的,故我们需要取前一个时刻的数据,比如前一个时刻的RSI指标,即rsi_value[1]。
  • 计算当前ris指标,rsi_value = rsi_array[1]

 

Params
  Numeric rsi_length(5);
  Numeric rsi_entry(16);
Vars
  NumericSeries rsi_array(0);
  NumericSeries rsi_value(0);
  NumericSeries rsi_buy(0);
  NumericSeries rsi_sell(0);

Begin
  // Calculate Rsi Value
  rsi_buy = 50 + rsi_entry;
  rsi_sell = 50 - rsi_entry;
  rsi_array = rsirsi(rsi_length); 
  rsi_value = rsi_array[1];

 

计算当前ATR指标,atr_value = atr_array[1];以及当前ATR均值,atr_ma= atr_ma_array[1]

 

Params
    Numeric atr_length(22);
  Numeric atr_ma_length(10);

Vars
  NumericSeries atr_value(0);
  NumericSeries atr_ma(0);
  NumericSeries atr_arry(0);
  NumericSeries atr_ma_array(0);

Begin
  // Calculate Atr Value and Atr Ma
  atr_arry = AvgTrueRange(atr_length);
  atr_ma_array = Average(atr_arry[atr_ma_length], atr_ma_length);  

  atr_value = atr_arry[1]; // last bar for atr_value  
  atr_ma = atr_ma_array[1];  // last bar for atr_ma_value

 
空仓情况下,发出限价单委托开仓:

  • 当波动率上涨并且RSI指标>66时,使用当前收盘价+5的限价单,超价买入保证成交;
  • 当波动率上涨并且RSI指标<34时候,使用当前收盘价-5的限价单,超价卖出保证成交。

 

  If(MarketPosition == 0)
  {
    intra_trade_low = Low[1];
    intra_trade_high = High[1];

    // 【Long condition】
    If(rsi_value > rsi_buy AND atr_value > atr_ma)
    {  
      long_limit = Close[1] + 5;
      If(long_limit>=Low)
      {
        Buy(fixed_size, Min(Open, long_limit));
      }  
    }

    // 【Short condition】
    Else If(rsi_value < rsi_sell AND atr_value > atr_ma)
    {
      short_limit = Close[1] - 5;
      If(short_limit <=High)
      {
        SellShort(fixed_size, Max(Open, short_limit));
      }   
    } 
  }

 

百分比移动止盈止损离场:

  • 多仓情况下,当价格从最高点回落0.8%的一瞬间触发条件单离场;
  • 空仓情况下,当价格从最低点回调0.8%的一瞬间触发条件单离场;

 

  // postition >0  
  Else If(MarketPosition >0)
  {
    intra_trade_high = Max(intra_trade_high, High[1]);
    intra_trade_low = Low[1];

    long_stop = intra_trade_high * (1 - trailing_percent / 100);

        If(Low <= long_stop)
        {
            Sell(MarketPosition, Min(Open, long_stop));
        }

  }

  // postiton < 0 
  Else If(MarketPosition <0)
  {  
    intra_trade_low = Min(intra_trade_low, Low[1]);
    intra_trade_high = High[1];

    short_stop = intra_trade_low *(1+ trailing_percent /100);

    If(High >= short_stop)
    {
            BuyToCover(-MarketPosition, Max(Open, short_stop));
    }
  }

 
 
策略回测结果

 

  • 数据:沪深300股指连续(IF888)
  • 时间区间:2019年1月~12月
  • K线周期:1分钟
  • 策略效果:资金曲线整体向上,平均盈亏比为2.08。

 
description

 
 

TB完整代码

 

Params
  Numeric atr_length(22);
  Numeric atr_ma_length(10);
  Numeric rsi_length(5);
  Numeric rsi_entry(16);
  Numeric trailing_percent(0.8);
  Numeric fixed_size(1);

Vars
  NumericSeries rsi_array(0);
  NumericSeries atr_value(0);
  NumericSeries atr_ma(0);
  NumericSeries rsi_value(0);
  NumericSeries rsi_buy(0);
  NumericSeries rsi_sell(0);
  NumericSeries intra_trade_high(0);
  NumericSeries intra_trade_low(0);
  NumericSeries atr_arry(0);
  NumericSeries atr_ma_array(0);
  NumericSeries long_stop(0);
  NumericSeries short_stop(0);
  NumericSeries long_limit(0);
  NumericSeries short_limit(0);
Begin

  // Calculate Rsi Value
  rsi_buy = 50 + rsi_entry;
  rsi_sell = 50 - rsi_entry;
  rsi_array = rsirsi(rsi_length); 
  rsi_value = rsi_array[1];

  // Calculate Atr Value and Atr Ma
  atr_arry = AvgTrueRange(atr_length);
  atr_ma_array = Average(atr_arry[atr_ma_length], atr_ma_length);


  atr_value = atr_arry[1]; // last bar for atr_value  
  atr_ma = atr_ma_array[1];  // last bar for atr_ma_value

  If(MarketPosition == 0)
  {
    intra_trade_low = Low[1];
    intra_trade_high = High[1];

    // 【Long condition】
    If(rsi_value > rsi_buy AND atr_value > atr_ma)
    {  
      long_limit = Close[1] + 5;
      If(long_limit>=Low)
      {
        Buy(fixed_size, Min(Open, long_limit));
      }  
    }

    // 【Short condition】
    Else If(rsi_value < rsi_sell AND atr_value > atr_ma)
    {
      short_limit = Close[1] - 5;
      If(short_limit <=High)
      {
        SellShort(fixed_size, Max(Open, short_limit));
      }      
    }
  }


  // postition >0  
  Else If(MarketPosition >0)
  {
    intra_trade_high = Max(intra_trade_high, High[1]);
    intra_trade_low = Low[1];

    long_stop = intra_trade_high * (1 - trailing_percent / 100);

        If(Low <= long_stop)
        {
            Sell(MarketPosition, Min(Open, long_stop));
        }

  }

  // postiton < 0 
  Else If(MarketPosition <0)
  {  
    intra_trade_low = Min(intra_trade_low, Low[1]);
    intra_trade_high = High[1];

    short_stop = intra_trade_low *(1+ trailing_percent /100);

    If(High >= short_stop)
    {
            BuyToCover(-MarketPosition, Max(Open, short_stop));
    }
  }
End

 

 

vn.py中的策略实现

 

TB策略的逻辑完全由行情驱动,即每次有行情变化(Tick更新、K线走完)时会完整执行代码中的所有逻辑。与之不同的是,vn.py内置的CTA策略模板,提供了诸多的事件驱动回调函数,如:Tick更新驱动(on_tick函数)、K线驱动(on_bar函数)、成交驱动(on_trade)、委托驱动(on_order)等。

 

要移植TB上的策略,只需在vn.py策略代码的on_bar回调函数中实现对应的策略逻辑即可:

 

  1. 调用cancel_all()函数撤销未成交委托,保证当前委托状态的干净和唯一性;
  2. 基于K线时间序列容器ArrayManager,来维护K线历史数据,计算需要的技术指标数据;
  3. 委托方式同样分为4种,下单时的可选参数中,stop=True意味着停止单,stop=False意味着限价单:
    • buy:买入开仓
    • sell:卖出平仓
    • short:卖出开仓
    • cover:买入平仓

 

def on_bar(self, bar: BarData):
    """
    Callback of new bar data update.
    """
    self.cancel_all()

    am = self.am
    am.update_bar(bar)
    if not am.inited:
        return

    atr_array = am.atr(self.atr_length, array=True)
    self.atr_value = atr_array[-1]
    self.atr_ma = atr_array[-self.atr_ma_length:].mean()
    self.rsi_value = am.rsi(self.rsi_length)

    if self.pos == 0:
        self.intra_trade_high = bar.high_price
        self.intra_trade_low = bar.low_price

        if self.atr_value > self.atr_ma:
            if self.rsi_value > self.rsi_buy:
                self.buy(bar.close_price + 5, self.fixed_size)
            elif self.rsi_value < self.rsi_sell:
                self.short(bar.close_price - 5, self.fixed_size)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        self.intra_trade_low = bar.low_price

        long_stop = self.intra_trade_high * \
            (1 - self.trailing_percent / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        self.intra_trade_high = bar.high_price

        short_stop = self.intra_trade_low * \
            (1 + self.trailing_percent / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

    self.put_event()

 

完整的代码实现请参考Github仓库中的策略源代码。

 
 

策略回测结果
 

  • 数据:RQData的沪深300主力拼接合约(IF888)
  • 时间区间:2019年1月~12月
  • K线周期:1分钟
  • 策略效果:资金曲线整体向上,与TB的资金曲线几乎一致。

 

description

 

 

两个平台的对比总结

 
K线数据访问区别

 

TB

  • 默认访问的是当前最新时间点的数据,如使用Close函数访问的是当前最新一根尚未走完的K线数据;
  • 如果需要访问最近一根已经走完的K线收盘价,则必须使用Close[1];
  • 同样,对于最近一根已经走完K线的技术指标,以RSI指标举例,则必须使用rsi_array[1]。
     

vn.py

  • 默认访问的是最近一根已经走完的K线数据;
  • 当前最新一根尚未走完的K线数据,在策略内禁止访问,杜绝TB上的信号闪烁问题(未来函数)。

 
 

委托撮合逻辑区别

 
TB

  • 需要策略开发者在策略内,自行编写相对复杂的委托撮合逻辑,来尽量逼近真实交易情况;

vn.py

  • 内置了详尽的停止单、限价单撮合逻辑,在调用委托函数时,只需调整可选参数stop即可实现委托的转变。
     

 
策略回测结果区别

 

即使在策略逻辑层面已经做到一致,TB和vn.py的回测资金曲线图依旧可能存在某些细节方面的区别。主要原因是数据源方面的不同,TB使用的是自身提供的历史数据源,而vn.py默认推荐使用的是RQData数据服务。

 
 
了解更多知识,请关注vn.py社区公众号。
 

description

通用型逐笔成交统计

 

逐笔成交统计想用通用化,难点在于去限定一次完整开平交易的开始点和结束点,抽象来说就是寻找特殊的断点对所有成交记录进行划分。

 

断点的选择

 
而在算法状态机控制中,我们可以知道数字0是一个非常有用的评判标准,即我们构建一列数据,让它数值在完全平仓后变成0,就知道真正的平仓时间。

 

在实践中,累计净持仓恰恰好符合这个标准,我们把多头仓位设为”+”,空头仓位设为“-”,得到如下表的【方向持仓】,对【方向持仓】进行累计得到【净持仓】。

 

这样,我们基于【净持仓】为0可以得到每次开平交易的结束点。而该结束点为成交记录的断点。

 

description

 
使用断点划分成交记录
 

为了简单演示,下面我们只显示【净持仓】(列)为0的成交信息(行),如下表所示,一共发生了5开完整的开平仓交易。每笔交易的结束点对应的交易序号分别为3、5、8、12、20。这5个结束点即为对所有成交信息的断点。

 

description

 
之后,我们要引入2个新的概念:
 

  • 存量:某一时间点的累计统计量
  • 增量:某一时间段内,累计统计量的增加量
     

存量是静态的,可以理解为对累计统计量的信息进行时间切片;而增量是动态的,代表时间切片信息的变化量,所以他们二者的关系如下:

 

T0时刻存量 + T0->T1增量 = T1时刻存量
 

换句话说,

 

T0->T1增量 = T1时刻存量 - T0时刻存量

 

回到逐笔回测统计主题上,增量这个概念,就能代表最新的完整开平仓交易,例如其每笔盈亏,对累计盈亏的影响。

 

如下图所示,在完成第一笔开平仓交易后,累计盈亏是1000;完成了第二笔完整的开平仓交易,累计盈亏是2000,那么二者的差别,即2000-1000=1000。这增加1000的盈利,就是属于第二笔开平仓交易的。

 

description

 
所以,通过对每个断点存量信息的对比,我们就可以得到每笔开平仓成交后的统计量:

 

description

 
这些开平仓的统计量可以如下表所示的开平成交量、开平盈亏,也可以是开平仓交易的持仓时间、手续费、滑点以及净盈亏:

 

description

 
 

从算法的原理到代码

 

计算开平交易结果
 

  1. 生成基础DataFrame信息,包括每笔交易的方向,开平,价格,时间;
  2. 计算方向持仓,以及有方向持仓累加的净持仓,计算累计持仓存量(成交量的简单相加);
  3. 计算盈亏存量,当净持仓为0时候,显示每笔开平交易对于存量盈亏的增量;
  4. 当净持仓为0时候,显示每笔开平交易的持仓时间,成交量,成交额的增量;
  5. 对DataFrame的行进行处理,剔除出那些净持仓不为0的行数,即剩下的行数都是每笔开平交易的最后一次平仓交易,通过平仓的方向可以判断该完整开平流程,如方向为空,开平为平,那么完整开平交易为多开->空平。
  6. 计算手续费,滑点以及净盈亏
  7. 返回新的DataFrame。

 

import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)


def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)

    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume

    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)

    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()

    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]

        return result
    df["pos"] = df.apply(calculate_pos, axis=1)

    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()

    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()

    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)

    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   

    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  

    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 

    # Select row data with net pos equil to zero     
    df = df.dropna()

    return df

def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)

    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)

    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)

    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage

    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]

    result = calculate_base_net_pnl(trade_df, capital)
    return result

 
 

汇总生成资金曲线
 

  1. 基于每笔开平交易的净盈亏,计算累计盈亏;
  2. 累计盈亏加上用户输入的起始资金即为资金曲线;
  3. 基于资金曲线计算每笔的每笔开平交易的盈利率,回撤和百分比回撤。

 

def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

    df.reset_index(drop=True, inplace=True)

    return df

 
 

统计整体策略效果
 

  1. 主要是一些统计指标的计算,如平均滑点,平均手续费,总成交次数,胜率,盈亏比,收益回撤比等等。
  2. 然后是画图,画出资金曲线图,每笔净盈亏图和净盈亏分布图

 

def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()

    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()

    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)

    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)

    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio

    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent

    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")

    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")

    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")

    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")

    if not show_chart:
        return

    plt.figure(figsize=(10, 12))

    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)

    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)

    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)

    plt.show()


def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}")

 
 
统计纯多头和纯空头交易

 

纯多头交易就是只有多开->空平的交易,而纯空头交易就是反过来。

 

为了筛选出纯多开交易,只要在DataFrame中判断其平仓方向的空的即可;纯空头交易则反过来,平仓方向为多。

 

def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result


def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result

 
 

整合所有计算步骤

 

最后,我们将上文中所有的函数进行整合,封装到单个函数中,用于实现策略回测效果的一键生成:
 

def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """

    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)

    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)

    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)

    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital)

description

 
 

最后附上完整的源代码
 

import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)


def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)

    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume

    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)

    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()

    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]

        return result
    df["pos"] = df.apply(calculate_pos, axis=1)

    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()

    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()

    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)

    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   

    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  

    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 

    # Select row data with net pos equil to zero     
    df = df.dropna()

    return df


def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)

    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)

    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)

    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage

    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]

    result = calculate_base_net_pnl(trade_df, capital)
    return result


def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

    df.reset_index(drop=True, inplace=True)

    return df


def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result


def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result


def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()

    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()

    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)

    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)

    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio

    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent

    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")

    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")

    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")

    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")

    if not show_chart:
        return

    plt.figure(figsize=(10, 12))

    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)

    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)

    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)

    plt.show()


def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}")


def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """

    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)

    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)

    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)

    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital)

 
 
了解更多知识,请关注vn.py社区公众号。
 
description

description
【快速交易】组件有固定宽度的空白列

description

description

移动【T型报价版】小窗口列宽,只显示固定列宽内容:即随着鼠标向右拖动, 组件窗口内的左边空白处增多

description

【情景分析组件】,点击【执行分析】后,x轴和y轴的名称,如隐含波动率,价格 不显示

description

【希腊值风险】首列的【代码】,最初显示是缩在一起的,建设最初显示时后,展开首列的列宽

建议对坐标进行限制,优化曲线移动时候体验。(有时候,移动过大导致找不到隐含波动率曲线)

description

  • 【y轴坐标】(波动率), 从0 到 隐含波动率最大值 + 50,
  • 【x轴坐标】(行权价),从深度虚值行权价 -100 , 到 深度实值行权价 + 100

在【隐含波动率曲线】,鼠标右键点击【export】出现报错, 显示无法import QtSvg

description

逐日盯市统计的痛点

 

在实盘交易中,逐日盯市(Marking-to-Market)是基于当日的收盘价、仓位数据、成交数据等来统计每日的盈亏,用于交易所对于客户盈亏情况的每日清算以及保证金管理。

 

在策略回测中,逐日盯市统计的算法也可以用于对策略盈亏曲线的计算和绘制。一个好的策略,资金曲线总是整体向上,并且相对平滑无太大回撤的,换句话说,就是夏普比率和收益回撤比都比较高。2.0版本的vn.py框架的CTA回测引擎,为了更直观的评估策略的整体效果,内置的盈亏统计采用了逐日盯市的模式。

 

但因为将每日所有的成交数据都映射到了最终收盘时的结果,逐日盯市统计的方式,无法在每笔开平仓交易的层面来分析盈亏情况,例如:手续费和滑点相对平均盈亏的占比、策略交易胜率和盈亏比等统计指标。

 
考虑到以上信息对于策略开发和研究的重要性,在本文中我们设计了一种新的逐笔开平对冲算法,来解决相关回测统计指标计算的问题。

 
 

逐笔对冲统计的概念

 

在讲解代码前,先通过例子来简单介绍一下逐笔对冲统计这个概念:

 

  1. 在交易中,若判断某个标的物上涨,可以买入开仓,如以100元的价格买入1手;
  2. 若价格继续上涨到110,我们“赚”了10元,但这属于浮动盈亏,只有真正平仓的时候盈利才会真正落在口袋里面,也就是从浮动盈亏变成实际盈亏;
  3. 价格继续上涨到112时候,我们发出卖出平仓指令,此时开平仓盈亏为(112-100)*1 = 12。

 

上面的例子中可以知道该笔开平仓赚了12元,在实际成交中我们还需要考虑手续费和滑点:

 

  • 手续费:每次交易都需要给交易所和经纪商支付的费用,对于股票等证券产品还需要交付印花税和过户费;
  • 滑点:交易中策略的目标成交价和最终的实际成交价可能会有一定的偏差,这个偏差就是滑点,背后的原因是盘口上的买卖价差以及实际交易中的延时。

 

计算完每笔交易的手续费和滑点后,我们就可以最终得到该笔开平仓交易的净盈亏情况。除了最简单的一开一平外,现实中许多策略的开平交易情况可能复杂得多,总体上可以分为:

 

  • 一笔开仓,接着是一笔平仓;
  • 一笔开仓或者平仓包含多次成交;
  • 逐步开仓后,有一定仓位再逐步平仓

 

计算完逐笔开平仓盈亏后,我们就可以统计每次交易的胜率和盈亏比了,更进一步还可以对交易方向进行筛选,来看看纯多头交易和纯空头交易的盈亏情况。

 
 

回测的原始成交结果

 

以下是vn.py中CTA回测引擎缓存回测成交信息的代码:

# Push trade update
self.trade_count += 1

if long_cross:
    trade_price = min(order.price, long_best_price)
    pos_change = order.volume
else:
    trade_price = max(order.price, short_best_price)
    pos_change = -order.volume

trade = TradeData(
    symbol=order.symbol,
    exchange=order.exchange,
    orderid=order.orderid,
    tradeid=str(self.trade_count),
    direction=order.direction,
    offset=order.offset,
    price=trade_price,
    volume=order.volume,
    time=self.datetime.strftime("%H:%M:%S"),
    gateway_name=self.gateway_name,
)
trade.datetime = self.datetime

self.strategy.pos += pos_change
self.strategy.on_trade(trade)

self.trades[trade.vt_tradeid] = trade

 
每一笔成交信息都以TradeData的数据格式缓存在trades字典中,我们可以通过打印输出该字典来直观地看看TradeData的数据结构。

 

打印输出成交记录

 

对成交缓存数据的结构有个大概的了解后,接下来遍历engine.trades的值,依次打印:

 

  • 成交时间:value.datetime;
  • 成交方向:value.direction.value;
  • 成交开平:value.offset.value;
  • 成交价格:value.price;
  • 成交数量:value.volume。

 

在遍历过程中,若检测到是平仓操作,即value.offset.value == "平”,则另外打印分隔线,便于肉眼观察每笔开平仓所对应的时间、价格等:

 

trade = engine.trades
for value in trade.values():
    print("时间:",value.datetime,value.direction.value,value.offset.value, "价格:",value.price, "数量:",value.volume)
    if value.offset.value == "平":
        print("---------------------------------------------------------")

 

description

 
尽管只是在Jupyter Notebook中简单的进行打印输出,已经能够一目了然的看到每次开平仓交易的基本信息。

 
其中的回测价格信息可以对照实盘成交回报信息进行对比,去计算真实成交和回测成交的价差,统计真实滑点,每隔一段时间(如一个月后)对回测中用到的滑点参数进行调整,力求回测尽量与实盘交易一致。

 
 

一开一平的统计逻辑

 

这里我们从简单的情况开始着手,首先做出假设条件:

 

  • 每次委托都能完全成交,即一次委托对应一次成交;
  • 开仓成交后接下来的就是平仓委托,即每2笔成交是开平仓关系;

 

由于不需要考虑较为复杂的一次委托多次成交情况,每次开仓成交后的下一笔必定是成交量相等的平仓成交,那么可以设计出如下的计算逻辑:

 

1.构建原始成交数据DataFrame,其中包括:日期时间、成交方向、开平仓、价格、数量;

2.把【价格】列表向后平移一个单位,得到上一笔成交记录;

  1. 对DataFrame进行条件判断:

a)若【成交方向】、【开平仓】为“多平”,意味着本次交易为空开->多平,那么盈亏=(开仓价格-平仓价格)* 成交数量;持仓时间=平仓时间-开仓时间;

b)若【成交方向】、【开平仓】为“空平”,意味着本次交易为多开->空平,那么盈亏=(平仓价格-开仓价格)* 成交数量;持仓时间=平仓时间-开仓时间;

4.对最后一行进行额外处理:若【开平】为“开”,【平仓价】和【持仓时间】设置为"待定",【盈亏】设为0;

5.使用dropna把【盈亏】为空值的行去掉;

6.对DataFrame的索引重新排序;

7.计算【累计盈亏】并画出图。

 

同时在实盘交易中做每日收盘后的统计时,我们可以设置DataFrame只显示当月的成交记录,便于重点观察最近一段时间的盈亏情况、持仓时间等:

 

description

 

上图中,我们可以一目了然地看出每一组完整的开平仓交易的最终盈亏以及持仓时间。

 

搞定了简单的情况,接下来我们可以将算法变得更加通用化,满足更多场景:一次开仓多次平仓、多次开仓一次平仓以及更加复杂的多次开仓多次平仓。
 

了解更多知识,请关注vn.py社区公众号。

description

仍出现报错

登陆Option Master后,
1) 选择中金所的IO期权 并且与当月IF股指期货作为对冲。
 
2) 其他组件均正常显示,除了【场景分析】不显示3D图。点击【执行分析】后,报错:Zero Division Error:Float DIvision
 
3) 报错出现后,有时候window后台无法关闭进程,显示的是【无响应】
 
4)但是Option Master能正常订阅行情并且波动率曲线微笑能跟着变化

 
description

 

description

作为量化开源框架,vn.py的一大好处在于自由度高,用户可以基于开放的源代码来专门实现自己特定的功能。对于常用的量化系统配置,VN Trader提供了全局配置工具方便用户直接在图形界面上进行修改。

 

进入VN Trader后,点击菜单栏上的【配置】按钮弹出【全局配置】窗口:

 

description
 

尽管参数看着挺多,但总体上可以分成以下5大类:

 

  • GUI界面字体
  • 日志输出
  • 邮件通知
  • 数据服务(RQData)
  • 数据库

 

下面我们来对这5大类的配置进行详细说明。

 

 

GUI界面字体

 

  • font.family:设置VN Trader整体的字体类型,除了默认的Arial,Courier New和System都是不错的选择。
  • font.size:设置VN Trader整体字体大小,需要根据自己显示器的实际分辨率效果进行修改,如下图中的15号字体。

     

description

 

 

日志输出

 

  • log.active:控制是否要启动LogEngine,默认为True。如果修改为False则后续几项设置都会失效,VN Trader在运行时无日志输出或是日志文件生成(可以降低部分系统延时)。
  • log.level:控制日志输出的级别,日志从轻到严重可以分成DEBUG、INFO、WARNING、ERROR、CRITICAL五个级别,分别对应10、20、30、40、50的整数值。数值低于设置值,它的日志信息将会被忽略。比如设置log.level=50,类似登录日志这种INFO级别不会输出到日志文件;若修改log.level=10,就会保存到日志文件中。如果需要全面的了解运行情况,可以选择设置为10,即除了DEBUG级别的日志信息都会输出。
  • log.console:console指的是终端,如Windows系统上的cmd和Powershell,以及Linux上的Terminal。当设置为True时,通过终端运行脚本来启动VN Trader,日志信息会输出在终端中;如果通过VN Station来直接启动VN Trader,则无console输出。
  • log.file:该参数用于控制是否要将日志输出到文件中,建议设置为True,否则无法记录生成的日志。

 

VN Trader的日志文件,默认位于运行时目录的.vntrader\log目录下,完整路径为:

 

C:\users\administrator.vntrader\log

 

其中administrator为当前Windows系统的登录用户名。

 

综合来说,如果想要记录更多的VN Trader系统运行细节信息,建议将level等级调低获取更多的信息,下面是开启Log.console和Log.File后的效果:

 
description

 

日志文件则会根据启动VN Trader时的日期自动创建:

 
description

 
 

邮件通知

 

每天盘中自动化交易时,我们可能希望每有委托成交,能够收到实时的通知;或者若出现异常情况,如数据错误、连接断开等,也要通知一下。

 

vn.py内置了邮件引擎EmailEngine,只要进配置好邮箱的账号、密码、服务器等信息,后续即可调用MainEngine.send_email函数来非常方便的发送邮件通知。

 

在这里我们通过QQ邮箱进行演示:

 

  1. 在web上登录qq邮箱;
  2. 在登陆页面点击【设置】-> 【账户】;
  3. 下滑账户界面,找到【POP3/IMAP/SMTP服务】选项;
  4. 开启红色方框里面的2项服务,点击下方【生成授权码】;
  5. 记录下刚生成的授权码。

 

description
 

得到授权码后,回到VN Trader的邮件相关的配置:
 

  • email.username:发送邮箱名,格式为xxxx@qq.com。
  • email.password:即上面提到的授权码。
  • email.sender:发送邮箱名,与email.username一致。
  • email.receiver:接收邮箱的地址,如xxxx@outlook.com,或者也可以直接使用email.sender(自己发送邮件给自己)。

 

配置完成后,重启VN Trader,点击菜单栏【帮助】->【测试邮件】发送测试邮件,若能顺利收到,则说明邮件设置成功。

 
 

绑定微信

 

比起在手机上装邮箱客户端,使用微信来接受实时的消息通知,无疑要方便得多,而且只需要准备一个QQ邮箱:
 

  1. 打开手机微信,在【设置】->【账号与安全】->【更多安全设置】中绑定接受邮件的QQ号;
  2. 然后在【设置】->【通用】->【辅助功能】中开启QQ邮箱提醒服务;
  3. 最后将VN Trader的全局配置中email.receiver改成该QQ号对应的QQ邮箱即可实施在手机端接收vn.py监控消息了。

     

description

 
 

数据服务(RQData)

 

RQData是目前国内期货和股票数据方面,性价较高的三方数据提供商之一。购买RQData后(或者申请试用账号),会获得了其license文件,只需将其中的内容填入以下字段即可:

 

  • rqdata.username:米筐登录账号;
  • rqdata.password:RQData的license。

     

(注意这里的username和password不是米筐官网登录用的账号和密码!)

 
 

数据库

 

vn.py目前支持4个常用数据库:

 

  • SQLite(默认)
  • MySQL
  • PostgreSQL
  • MongoDB

 

其中SQLite为vn.py的默认数据库,它的优势主要表现在2点:
 

  • 非常的轻量并且对外部依赖非常小;
  • 相比于mongodb或者mysql这种关系型数据库,不需要复杂的配置就能达到不错的性能要求。

 

当然,其他数据库在特定的场景下也都有着自己的优势,如:更快的加载速度、支持多用户同时访问等。改为使用其他数据库,首先需要准备完该数据库的服务器以及图形客户端,然后在VN Trader全局配置进行相关的全局配置。

 
 

SQLite

 
vn.py默认数据库,不需要修改任何配置。在第一次启动VN Trader时,程序会自动在用户目录下的.vntrader文件夹中生成database.db文件,后续所有相关的历史行情数据都会放在该文件中。

 

MongoDB

 

基础的配置只需要配置连接的数据库名称、主机名和端口号,至于用户登录信息和授权信息,可以留空。

 

  • database.driver:修改为mongodb。
  • database.host:使用数据库一般连接的都是localhost(即本机)。
  • database.port:默认使用的端口号是27017。
  • database.database:可以自行填入数据库的名称,即“mongodb”;也可以不填,MongoDB会自动创建。
  • 其他字段留空即可。

 

 
description

 

MySQL

 

  • database.driver:修改为mysql。
  • database.database:需要先通过MySQL的客户端工具,创建一个新的数据库,数据库名称填入到这里的字段中(如vnpy_database)。注意:与MongoDB不同,若留空会报错。
  • database.user和database.password可以在安装MySQL时候得到。
  • database.host:同样一般使用localhost。
  • database.port:MySQL的默认端口为3306;

 

description

 

PostgreSQL

 

和MySQL配置几乎一模一样,只需要将端口database.port改为5432:

 
description

 

2019年vn.py核心团队的最后一期小班课报名进行中!

两天共计10小时的密集提高课程

8套高端CTA策略源代码分享

DoubleChannel

KeltnerBandit

RsiMomentum

SuperTurtle

TrendThrust

Cinco

Cuatro

SuperCombo

动态风险仓位管理模型

策略内嵌复杂算法交易

详情请戳:第四期vn.py小班课上线:CTA策略开发!

 
了解更多知识,请关注vn.py社区公众号。
description

万恶的全局锁

 

基于物理上的限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,多线程的编程方式被越来越多地应用到了各类程序中,而随之带来的则是线程间数据一致性和状态同步的困难。

 

作为已经30岁的Python,自然早已支持多线程的功能,但坊间却始终存在着一种误解:Python的多线程是假的(或者虚拟机模拟的)。

 

Python虚拟机(或者叫解释器)使用了引用计数法的垃圾回收方式(另一种是以Java虚拟机为例的根搜索算法),这种垃圾回收方式使得Python虚拟机在运行多线程程序时,需要使用一把名为GIL(Global Interpreter Lock,全局解释器锁)的超级大锁,来保证每个对象上的引用计数正确。

 

从操作系统来看,每个CPU在同一时间都能够且只能执行一个线程。而在Python虚拟机上,任何一个线程的运行,都需要包含以下三个步骤:

 

  1. 获取GIL;
  2. 执行代码,直到sleep,或者被Python虚拟机挂起;
  3. 释放GIL;

 

因此,某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个Python进程中GIL也只有一个。所以哪怕硬件上CPU有再多的核心,任意时刻都只能有一个线程能拿到GIL来执行,这也是之前提到的误解来源。

 

Python多线程的痛点在于每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。这导致很多时候,尤其是计算密集型任务为主的程序,多核多线程比单核多线程更差:

 

  • 单核下多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行;
  • 多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低。

 

因此,在Python中想要充分压榨多核CPU的性能,必须依赖多进程的模式。每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行。
 

 

方便的多进程

 

Python语言中内置了专门用于实现多进程的multiprocessing库,使用上相当傻瓜,通过multiprocessing.Process类来创建一个新的子进程对象,再启动这个对象,这样一个多进程任务就开始执行了。

 

等CPU分配一个独立核心去干活,func函数就在这个子进程中开始执行了,这里唯一要注意args是默认输入元组参数。

 

p = multiprocessing.Process(target=func, args=(a,))
p.start()

除了一个一个的启动子进程外,也可以使用multiprocessing.Pool来创建进程池对象,把需要干的工作任务打包好,放在这个池子里面,这样一个任务执行完CPU核心空闲下来后,就能自动从进程池中去获取一个新的任务继续干活。

 

基本的使用步骤如下:

 

  1. 设置进程池中的进程数量,通常将其设置为小于或者等于cpu核心数量,避免多余的进程无法同时执行还要占用额外的内存;
  2. 然后使用pool.apply_async方法,把打包好的任务插入池中;
  3. 调用pool.close把这个进程池关闭,不再接受新的任务;
  4. 若还有一些已有任务在跑,使用pool.join()函数,阻塞当前的主线程,直到进程池中的所有任务都执行完成才进入下一步。

 
 

多进程参数优化

 

学习多进程模块怎么用,最好的例子之一就是vn.py的CTA策略回测引擎中的参数优化功能,加载同样的历史数据基于不同的参数,执行历史数据回放和策略盈亏统计,属于典型的多进程应用场景。

 

多进程优化函数位于:

 

vnpy.app.cta_strategy.backtesting.BacktestingEngine.run_optimization

 

该函数中的执行步骤如下:

 

  1. 传入全局优化列表settings,传入的参数越多,所形成的全局组合越多;
  2. 传入优化目标,常见的有夏普比率,收益回撤比;
  3. 根据主机的CPU核数创建对应数量的进程池pool;
  4. 在for循环中从全局优化列表settings获取元素回测参数setting,和策略类,策略参数打包为任务内容,和任务方法optimize一起组合为一个工作任务,最后插入到进程池给CPU核心去跑;
  5. 每次优化结果为result字典,并且把回测结果放在results列表中;
  6. 基于优化目标,如夏普比率对results列表进行排序。

 

def run_optimization(self, optimization_setting: OptimizationSetting, output=True):
    """"""
    # Get optimization setting and target
    settings = optimization_setting.generate_setting()
    target_name = optimization_setting.target_name

    if not settings:
        self.output("优化参数组合为空,请检查")
        return

    if not target_name:
        self.output("优化目标未设置,请检查")
        return

    # Use multiprocessing pool for running backtesting with different setting
    pool = multiprocessing.Pool(multiprocessing.cpu_count())

    results = []
    for setting in settings:
        result = (pool.apply_async(optimize, (
            target_name,
            self.strategy_class,
            setting,
            self.vt_symbol,
            self.interval,
            self.start,
            self.rate,
            self.slippage,
            self.size,
            self.pricetick,
            self.capital,
            self.end,
            self.mode
        )))
        results.append(result)

    pool.close()
    pool.join()

    # Sort results and output
    result_values = [result.get() for result in results]
    result_values.sort(reverse=True, key=lambda result: result[1])

    if output:
        for value in result_values:
            msg = f"参数:{value[0]}, 目标:{value[1]}"
            self.output(msg)

    return result_values

 
启动多进程优化的任务后,打开Windows的任务管理器,可以看到此时CPU所有的8个核心都已经在满载运行了。

 

2019年vn.py核心团队的最后一期小班课开始报名:

两天共计10小时的密集提高课程

8套高端CTA策略源代码分享

DoubleChannel

KeltnerBandit

RsiMomentum

SuperTurtle

TrendThrust

Cinco

Cuatro

SuperCombo

动态风险仓位管理模型

策略内嵌复杂算法交易

详情请戳:第四期vn.py小班课上线:CTA策略开发!

 
了解更多知识,请关注vn.py社区公众号。
description

数据分析的必要性

 

策略开发离不开数据分析:

 

  1. 首先,数据质量的好坏决定策略实盘的可行性,例如下载仿真历史数据,却用来研究实盘策略,就会导致典型的“Garbage in, garbage out”。
  2. 其次,下载的历史数据不应该出现某段时间缺失的情况,这就要求建模分析的第一步是对行情数据进行画图,通过人眼初步判断数据的好坏质量:即无缺失的行情部分、也无明显的“异常”数据点。
  3. 确定数据质量可靠后,就可以分析数据特性,从而决定其适用于开发哪种类型的策略。若数据自相关性弱,属于平稳时间序列,符合正态分布,可以考虑用作统计套利;若数据自相关性强,且非平稳时间序列,有明显的肥尾特征,则可以用来开发趋势跟踪类的策略(CTA)。

 

从另一角度来说,CTA策略开发前的建模分析流程如下:

 

  1. 对行情数据画图,确定无断点;
  2. 进行随机性检验,确定数据具有非纯随机性;
  3. 进行平稳性检验,确定时间序列的不平稳性;
  4. 画出自相关图,确定时间序列自相关性强;
  5. 相对波动率分析,确定其平均波动率大于固定交易成本(手续费);
  6. 行情变化率分析,确定其符合肥尾分布;
  7. 计算相关计算指标(DataFrame格式),可以与行情图对比分析;
  8. 合成更高维度的K线,重复前面1~7步。

 
 

数据分析流程

 

第1步:对行情数据进行画图

 

首先,调用vn.py数据库模块的database_manager.load_bar_data函数从数据库载入数据到内存中。

 

load_bar_data函数的输入参数有5个:

 

  • symbol:合约代码,由用户定义,字符串格式;
  • exchange:交易所代码,Exchange枚举值格式;
  • interval:K线周期,Interval枚举值格式,如Interval.MINUTE;
  • start:数据开始时间,datetime格式,如datetime(2019, 4, 1);
  • end:数据结束时间,datetiem格式,如datetime(2019, 10, 30);

 

load_bar_data函数输出的是一个包含系列BarData格式行情数据的列表bars。
 

from vnpy.trader.database import database_manager

output("开始加载历史数据")
bars = database_manager.load_bar_data(    
    symbol=symbol, 
    exchange=exchange, 
    interval=interval, 
    start=start, 
    end=end,
)
output(f"历史数据加载完成,数据量:{len(bars)}")

def output(msg):
    """"""
    print(f"{datetime.now()}\t{msg}")

 
用for循环读取bars列表中的BarData数据,然后分别缓存时间、开盘价、最高价、最低价、收盘价到专门的列表中,最终合成DataFrame, 设置DataFrame索引为K线数据的时间。

 

# Generate history data in DataFrame
t = []
o = []
h = []
l = []
c = []

for bar in bars:
    time = bar.datetime
    open_price = bar.open_price
    high_price = bar.high_price
    low_price = bar.low_price
    close_price = bar.close_price

    t.append(time)
    o.append(open_price)
    h.append(high_price)
    l.append(low_price)
    c.append(close_price)

self.orignal = pd.DataFrame()
self.orignal["open"] = o
self.orignal["high"] = h
self.orignal["low"] = l
self.orignal["close"] = c
self.orignal.index = t

 

对收盘价进行画图,设置图的尺寸和标题。然后用肉眼初步确认时间序列图无数据缺失或者明显“异常”行情。

 

output("第一步:画出行情图,检查数据断点")
self.orignal["close"].plot(figsize=(20, 8), title="close_price")
plt.show()

 
description

 

第2步:随机性检验
 

调用statsmodels库的acorr_ljungbox函数,对收盘价进行白噪声检验,函数返回一个p值,p值越大表示原假设成立的可能性越大,即数据是随机的可能性越大。

 
一般p值与0.05进行对比:

 

  • 若p值>0.05,证明数据具有纯随机性:不具备某些数据特征,没有数据分析意义;
  • 若p值<0.05,证明数据不具有纯随机性:具备某些数据特性,进行数据分析有利于开发出适应其统计规律的交易策略。

 

from statsmodels.stats.diagnostic import acorr_ljungbox

def random_test(close_price):
    """
    白噪声检验
    """
    acorr_result = acorr_ljungbox(close_price, lags=1)
    p_value = acorr_result[1]
    if p_value < 0.05:
        output("第二步:随机性检验:非纯随机性")
    else:
        output("第二步:随机性检验:纯随机性")
    output(f"白噪声检验结果:{acorr_result}\n")

 

description

 

第3步:平稳性检验

 

同样,调用statsmodels库的adfuller函数对收盘价进行单位根检验,函数返回的是一个字典,我们对字典里面的字段进行判断:

 

  • 若ADF值>10% 统计量,说明原假设成立:存在单位根,时间序列不平稳;
  • 若ADF值<10% 统计量,说明原假设不成立:不存在单位根,时间序列平稳。

 
CTA策略研究的是非平稳性时间序列,平稳时间序列适用于期货价差的统计套利。

 

from statsmodels.tsa.stattools import adfuller as ADF

def stability_test(close_price):
    """
    平稳性检验
    """
    statitstic = ADF(close_price)
    t_s = statitstic[1]
    t_c = statitstic[4]["10%"]

    if t_s > t_c:
        output("第三步:平稳性检验:存在单位根,时间序列不平稳")
    else:
        output("第三步:平稳性检验:不存在单位根,时间序列平稳")

    output(f"ADF检验结果:{statitstic}\n")

 

description

 

 
第4步:画出自相关图

 
同样,调用statsmodels库的plot_acf函数和plot_pacf函数对收盘价画自相关和偏自相关图:

 

  • 自相关图:统计相关性总结了两个变量之间的关系强度,即描述了一个观测值与另一个观测值之间的自相关,包括直接和间接的相关性信息。这种关系的惯性将继续到之后的滞后值,随着效应被削弱而在某个点上缩小到没有。
  • 偏自相关图:偏自相关是剔除干扰后时间序列观察与先前时间步长时间序列观察之间关系的总结,即只描述观测值与其滞后之间的直接关系。可能超过k的滞后值不会再有相关性。

 

置信区间被画成圆锥形。默认情况下,置信区间被设置为95%,这表明,圆锥之外的值很可能是相关的,而不是统计上的意外。

 
也就是说,圆锥以外的值越多,时间序列自相关性越强,越适用于研究CTA策略。

 

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

def autocorrelation_test(close_price):
    """
    自相关性检验
    """
    output("第四步:画出自相关性图,观察自相关特性")

    plot_acf(close_price, lags=60)
    plt.show()

    plot_pacf(close_price, lags=60).show()
    plt.show()

 

description

 

第5步:相对波动率分析

 

  1. 调用talib库的ATR函数计算1分钟K线的绝对波动率;
  2. 通过收盘价*手续费计算固定成本;
  3. 相对波动率=绝对波动率-固定成本;
  4. 对相对波动率进行画图:时间序列图和频率分布图;
  5. 对相对波动率进行描述统计分析,得到平均数、中位数、偏度、峰度

 

若相对波动率分布属于右偏(肥尾在右边),且分布陡峭,在统计学上具有尖峰肥尾的特色,适用于研究CTA策略。

 

def relative_volatility_analysis(self, df: DataFrame = None):
    """
    相对波动率
    """
    output("第五步:相对波动率分析")
    df["volatility"] = talib.ATR(
        np.array(df['high']),
        np.array(df['low']),
        np.array(df['close']),
        self.window_volatility
    )

    df["fixed_cost"] = df["close"] * self.rate
    df["relative_vol"] = df["volatility"] - df["fixed_cost"]

    df["relative_vol"].plot(figsize=(20, 6), title="relative volatility")
    plt.show()

    df["relative_vol"].hist(bins=200, figsize=(20, 6), grid=False)
    plt.show()

    statitstic_info(df["relative_vol"])

def statitstic_info(df):
    """
    描述统计信息
    """
    mean = round(df.mean(), 4)
    median = round(df.median(), 4)    
    output(f"样本平均数:{mean}, 中位数: {median}")

    skew = round(df.skew(), 4)
    kurt = round(df.kurt(), 4)

    if skew == 0:
        skew_attribute = "对称分布"
    elif skew > 0:
        skew_attribute = "分布偏左"
    else:
        skew_attribute = "分布偏右"

    if kurt == 0:
        kurt_attribute = "正态分布"
    elif kurt > 0:
        kurt_attribute = "分布陡峭"
    else:
        kurt_attribute = "分布平缓"

    output(f"偏度为:{skew},属于{skew_attribute};峰度为:{kurt},属于{kur
    t_attribute}
\n")

 

description
 

 

第6步:变化率分析
 

  1. 计算百分比变化率 = 100*(收盘价- 上一根收盘价)/上一根收盘价;
  2. 对变化率进行画图:时间序列图和频率分布图;
  3. 对变化率进行描述统计分析,得到平均数、中位数、偏度、峰度。

 

def growth_analysis(self, df: DataFrame = None):
    """
    百分比K线变化率
    """
    output("第六步:变化率分析")
    df["pre_close"] = df["close"].shift(1).fillna(0)
    df["g%"] = 100 * (df["close"] - df["pre_close"]) / df["close"]

    df["g%"].plot(figsize=(20, 6), title="growth", ylim=(-5, 5))
    plt.show()

    df["g%"].hist(bins=200, figsize=(20, 6), grid=False)
    plt.show()

    statitstic_info(df["g%"])

 

description

 

想了解更多关于CTA策略开发实战的各种细节?请戳课程上线:《vn.py全实战进阶》!目前课程已经更新过半,一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。
 
了解更多知识,请关注vn.py社区公众号。
description

R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。

 
R-Breaker策略结合了趋势反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。

 

 

策略逻辑

 

R-Breaker策略的核心逻辑由以下4部分构成:

 

1)计算6个目标价位

 
根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:

 

  • 突破买入价(Bbreak)
  • 观察卖出价(Ssetup)
  • 反转卖出价(Senter)
  • 反转买入价(Benter)
  • 观察买入价(Bsetup)
  • 突破卖出价(Sbreak)

 

具体计算方法如下:(其中a、b、c、d为策略参数)
 

 

  • 观察卖出价(Ssetup)= High + a * (Close – Low)
  • 观察买入(Bsetup)= Low – a * (High – Close)
  • 反转卖出价(Senter)= b / 2 (High + Low) – c Low
  • 反转买入价(Benter)= b / 2 (High + Low) – c High
  • 突破卖出价(Sbreak)= Ssetup - d * (Ssetup – Bsetup)
  • 突破买入价(Bbreak)= Bsetup + d * (Ssetup – Bsetup)

 

description
 
2)设计委托逻辑

 

趋势策略交易:

  • 若价格>突破买入价,开仓做多;
  • 若价格<突破卖出价,开仓做空;

 

反转策略交易:

  • 若日最高价>观察卖出价,然后下跌导致价格<反转卖出价,开仓做空或者反手(先平仓再反向开仓)做空;
  • 若日最低价<观察买入价,然后上涨导致价格>反转买入价,开仓做多或者反手(先平仓再反向开仓)做多;

 

3)设定相应的止盈止损

 

4)日内策略要求收盘前平仓
 

以上是原版R-Breaker策略逻辑,但使用RQData从2010年至今(2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。

 
 

改进优化

 

从逻辑上看R-Breaker策略可以拆分成趋势策略和反转策略,那么不妨试试将这两种逻辑分开,并逐个进行优化。

 

1)趋势策略:

 

若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;

  • 若当前x分钟的最低价<观察买入价,认为它具有下跌趋势,在突破卖出价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 增加过滤条件:为防止横盘行情导致不断的开平仓,日内每次开仓买入开仓(卖出开仓)委托的价位都比上一次更高(更低);
  • 收盘前,必须平掉所持有的仓位。

 

2)反转策略:

 

  • 若当前x分钟的最高价>观察卖出价,认为它已经到了当日阻力位,可能发生行情反转,在反转卖出价挂上卖出开仓的停止单;
  • 若当前x分钟的最低价>观察买入价,认为它已经到了当日支撑位,可能发生行情反转,在反转买入价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 收盘前,必须平掉所持有的仓位。
     

其代码实现逻辑如下:

 

self.tend_high, self.tend_low = am.donchian(self.donchian_window)

if bar.datetime.time() < self.exit_time:

    if self.pos == 0:
        self.intra_trade_low = bar.low_price
        self.intra_trade_high = bar.high_price

        if self.tend_high > self.sell_setup:
            long_entry = max(self.buy_break, self.day_high)
            self.buy(long_entry, self.fixed_size, stop=True)

            self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

        elif self.tend_low < self.buy_setup:
            short_entry = min(self.sell_break, self.day_low)
            self.short(short_entry, self.fixed_size, stop=True)

            self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

# Close existing position
else:
    if self.pos > 0:
        self.sell(bar.close_price * 0.99, abs(self.pos))
    elif self.pos < 0:
        self.cover(bar.close_price * 1.01, abs(self.pos))

 
 

回测效果

 

同样使用2010年至今的1分钟IF88数据进行回测。不过在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。

 

1)趋势策略:

 

  • 趋势策略夏普比率1.96,日均成交2.6笔,资金曲线是整体上扬的;
  • 但是在2017~2018年的盘整阶段,具有较大并且持续时间较长的回撤;
  • 这凸显出趋势类策略自身无法规避的缺点:在趋势行情中盈利,意味着震荡行情必然亏损。

 

description

 

2)反转策略

 

  • 反转策略夏普比率0.75,日均成交0.4笔,资金曲线缓慢上扬;
  • 但是在2017~2018年的盘整阶段,资金曲线上扬最快,而且这个阶段是最平滑的;
  • 这凸显出反转类策略优点:尽管在趋势行情亏损,在震荡行情必然能盈利。

 

description
 

综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:

 

  • 由于趋势策略日均交易笔数较多(2.08笔),它主要负责贡献R-Breaker策略的alpha;
  • 趋势策略的亏损也是主要导致R-Breaker策略亏损的原因,但这时候的亏损可以由反转策略的盈利来填补。

 

由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。

 

由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。

 

为了方便演示,我们设置趋势策略每次交易1手,反转策略则是3手,然后将两者合成为R-Breaker策略。

 
经过以上的仓位配置后,回测结果中的夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。

 
description

 

 

最终结论

 

R-Breaker策略的成功之处在于它并不是纯粹的趋势类策略,而是属于复合型策略,其alpha由两部分构成:趋势策略alpha和反转策略alpha。

 
这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。

 

更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金,这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。

 

从上面的例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为:夏普比率=超额收益/风险,所以夏普比率高意味着资金曲线非常平滑,也意味着我们可以有效的控制使用杠杆的风险。

 

当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如采用结构化产品的形式,经过银行等中介通过发行优先级份额,来为劣后级份额提供杠杆。这时劣后级投资者(有时100%是交易团队自身持有)同时也是债务人的角色,即在承担更大风险的同时,追求更高的最终收益,而优先级投资者则作为债权人享受利息收益。

 

 

策略源码

 

最后,秉承vn.py社区的一贯精神:

 

Talk is cheap, show me your pnl (or code) !

 

自然必须附上策略的源代码:

 

from datetime import time
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)


class RBreakStrategy(CtaTemplate):
    """"""
    author = "KeKe"

    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30

    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3

    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价

    intra_trade_high = 0
    intra_trade_low = 0

    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0

    exit_time = time(hour=14, minute=55)

    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super().__init__(cta_engine, strategy_name, vt_symbol, setting        )

        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]

        # New Day
        if last_bar.datetime.date() != bar.datetime.date():
            if self.day_open:

                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价

                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价

                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价

            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price

        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price

        if not self.sell_setup:
            return

        self.tend_high, self.tend_low = am.donchian(self.donchian_window)

        if bar.datetime.time() < self.exit_time:

            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price

                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)

                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)

                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)

            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)

        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))

                self.put_event()

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass

 

想了解更多关于CTA策略开发实战的各种细节?请戳课程上线:《vn.py全实战进阶》!目前课程已经更新过半,一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。

 
了解更多知识,请关注vn.py社区公众号。
description

了准备Tick数据

 
 
要获取Tick数据,并插入到vn.py数据库中,整体上有3种方法:

 

  1. 使用vn.py行情记录模块DataRecorder来自行录取:在保证网络稳定的情况下,启动一个独立的进程,负责在交易时段录制行情数据;并在收盘后,使用脚本工具完成数据的自动清洗工作,整体上实现起来比较耗费精力;
  2. 从RQData或者其他数据源下载:专业的数据服务商可以提供已经清洗好的Tick数据,但是价格往往比较贵;
  3. 通过CSV文件来载入:CSV格式的Tick数据相对来说比较容易获取(万能的淘宝~),因此对个人用户来说是个比较容易的选择。
     

那么本文我们就选择第3种方法,通过读取CSV文件,把数据载入到数据库中。

 

 

载入CSV文件

 

首先需要保证你已经在系统上安装配置好了数据库,这里演示用的是MongoDB数据库以及图形化客户端Robo 3T。
 

注意在MongoDB中需要创建新数据库“vnpy”,然后在全局配置对话框中,修改相关配置:

 

    "database.driver": "mongodb",
    "database.database": "vnpy",
    "database.host": "localhost",
    "database.port": 27017,
    "database.user": "",
    "database.password": "",
    "database.authentication_source": ""

 

注意输入上述内容到配置对话框中时,请忽略引号。修改完毕保存后,请重新启动VN Trader,检查相关配置是否已经修改成功。
 

然后我们把所有的CSV文件放在同一文件夹下,这样就可以使用一个脚本来读取该文件夹内的所有CSV格式文件,并批量载入到数据库中。

 

description

 

在开始处理数据之前,我们需要知道CSV文件中的表头信息和数据特征。用Excel打开其中任意一个CSV文件,查看其中的内容后,建立一个比较直观的印象,大概知道:

 

  • 哪些数据需要写入到数据库;
  • 哪些数据不需要写入;
  • 哪些数据需要强行赋值。

 

这里我们的CSV文件,表头以及第一行内容如下:

 

交易日,合约代码,交易所代码,合约在交易所的代码,最新价,上次结算价,昨收盘,昨持仓量,今开盘,最高价,最低价,数量,成交金额,持仓量,今收盘,本次结算价,涨停板价,跌停板价,昨虚实度,今虚实度,最后修改时间,最后修改毫秒,申买价一,申买量一,申卖价一,申卖量一,申买价二,申买量二,申卖价二,申卖量二,申买价三,申买量三,申卖价三,申卖量三,申买价四,申买量四,申卖价四,申卖量四,申买价五,申买量五,申卖价五,申卖量五,当日均价,业务日期
20190102,ru1905,,,11280.0000,11290.0000,11305.0000,322472,11280.0000,11280.0000,11280.0000,246,27748800.0000,322468,0.0000,0.0000,12080.0000,10495.0000,0,0,08:59:00,500,11280.0000,10,11290.0000,10,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,112800.0000,20190102

 

从以上内容中,我们发现下述特征:

 

  • 表头是中文;
  • 可以直接载入的数据:最新价,持仓量,买一价,买一量,卖一价,卖一量;
  • 需要合成的数据:datetime,它由3列数据合成,分别是交易日,最后修改时间,最后修改毫秒;
  • 强行赋值数据是主力连续合约(RU88)和交易所(SHFE);
  • 集合竞价阶段数据需要保留;
  • 非交易时段产生的垃圾数据需要剔除。

 
有了这样的需求后,我们在接下来开发脚本的过程中就有了方向:

 

1)使用for循环遍历同一文件夹内所有CSV格式的文件(即以“.csv"结尾的文件名),使用csv_load函数来载入数据:

 

import os 
import csv
from datetime import datetime, time

from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager
from vnpy.trader.object import TickData


def run_load_csv():
    """
    遍历同一文件夹内所有csv文件,并且载入到数据库中
    """
    for file in os.listdir("."): 
        if not file.endswith(".csv"): 
            continue

        print("载入文件:", file)
        csv_load(file)

 

2)csv_load函数的具体设计

 

  1. 使用csv.DictReader类,来访问CSV文件中的数据;
  2. 通过字符串相加合成标准时间,在使用datetime.strptime函数转化成时间元组;
  3. 通过datetime.time函数的判断来剔除非交易时间段数据;
  4. 将符合标准数据,生成TickData数据对象;
  5. 在循环中把TickData插入到ticks列表中;
  6. 最终使用database_manager.save_tick_data函数把ticks列表中的数据写入到数据库中。

 

def csv_load(file):
    """
    读取csv文件内容,并写入到数据库中    
    """
    with open(file, "r") as f:
        reader = csv.DictReader(f)

        ticks = []
        start = None
        count = 0

        for item in reader:

            # generate datetime
            date = item["交易日"]
            second = item["最后修改时间"]
            millisecond = item["最后修改毫秒"]

            standard_time = date + " " + second + "." + millisecond
            dt = datetime.strptime(standard_time, "%Y%m%d %H:%M:%S.%f")

            # filter
            if dt.time() > time(15, 1) and dt.time() < time(20, 59):
                continue

            tick = TickData(
                symbol="RU88",
                datetime=dt,
                exchange=Exchange.SHFE,
                last_price=float(item["最新价"]),
                volume=float(item["持仓量"]),
                bid_price_1=float(item["申买价一"]),
                bid_volume_1=float(item["申买量一"]),
                ask_price_1=float(item["申卖价一"]),
                ask_volume_1=float(item["申卖量一"]), 
                gateway_name="DB",       
            )
            ticks.append(tick)

            # do some statistics
            count += 1
            if not start:
                start = tick.datetime

        end = tick.datetime
        database_manager.save_tick_data(ticks)

        print("插入数据", start, "-", end, "总数量:", count)  

 if __name__ == "__main__":
    run_load_csv()

 
创建好脚本后可以直接运行:进入cmd或者Powershell,运行命令python load_tickdata.py即可,效果如下图所示:

 

description
 

此时我们使用Robo 3T客户端来连接上MongoDB,在数据库【vnpy】->【db_tick_data】可以看到新载入的数据:

 
description

 

 

Tick模式回测

 

CTA策略模块(CtaStrategy)的回测引擎BacktestingEngine支持Tick数据的回测,以下代码推荐在Jupyter Notebook中运行。

 

第一步我们需要在策略文件中进行一些修改,这里以AtrRsiStrategy策略为例:找到on_init函数,把其中的load_bar(10)改为load_tick(10),即指定加载过去10天的Tick数据来执行策略初始化任务,而不是加载K线Bar数据进行初始化。

 
然后在加载回测相关的模块时,需要额外加载BacktestingMode枚举类型,其中包含有回测引擎所支持的Bar(K线)和Tick两种模式:
 

from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from datetime import datetime
from atr_rsi_strategy import  AtrRsiStrategy

 
创建回测引擎对象的实例后,在调用set_parameters函数时,参数中需要新增“mode=BacktestingMode.TICK ”,来指定回测引擎使用Tick回测模式。
 

同时需要注意另外2点:

 

  • interval需要传入"1m",尽管在Tick回测中用不到K线,但不能设置为None或者不传,会导致报错;
  • 合约乘数、手续费、滑点三个参数,需要根据合约品种的具体情况做调整。

 

engine = BacktestingEngine()
engine.set_parameters(
    vt_symbol="RU88.SHFE",
    interval="1m",
    start=datetime(2019, 1, 1),
    end=datetime(2019, 4, 1),
    rate=0.5/10000,
    slippage=5,
    size=10,
    pricetick=5,
    capital=1_000_000,
    mode=BacktestingMode.TICK   
)
engine.add_strategy(AtrRsiStrategy, {})

 

后续的操作和K线模式回测就几乎完全相同了,加载历史数据并执行数据回放,然后基于逐笔成交计算每日盈亏数据,并生成最终的策略统计结果以及回测图表:

 

engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()

 

由于这3个月行情多为区间震荡,所以以去趋势跟踪为核心逻辑的AtrRsiStrategy的回测效果不太理想:
 
description

 

下面我们如果想再看看详细的逐笔成交记录,可以遍历回测引擎中保存所有成交数据的trades字典,并打印每笔成交相关的字段信息:

 

trades = engine.trades
for value in trades.values():
    print("时间:",value.datetime,value.direction.value,value.offset.value, "价格:",value.price, "数量:",value.volume)
    if value.offset.value == "平":
        print("---------------------------------------------------------")

 

这样就能看到每笔成交具体发生的时间点,并和Tick数据当时的盘口情况进行相应的比对检查了:
 

description

 

《vn.py全实战进阶》课程已经更新过半!一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程,详细内容请戳课程上线:《vn.py全实战进阶》!

 
了解更多知识,请关注vn.py社区公众号。

description

可以这类东西并没有统计显著。
你可以基于talib库来实现,如三只乌鸦等,返回的是0-100的数值

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】