恰巧我有做价差交易,就补充一下: 先对齐时间戳,再合成价差。
楼主的做法,是有tick数据就合成价差,这会引入脏数据。价差交易的标准做法,是通过回望过去n个spread data,得到均线和标准差,构建上下轨(可以理解为置信区间),但它突破的时候做价差回归交易。当缓存的spread data数组不准时候,交易信号自然有错。
假设t1时刻同时来2个tick,时间上先A,后B。那么只需要收到B数据时候,在合成价差就可以了。因为收到a时刻,合成是spread是B(t+0) - A(t + 1),正确的价差永远是对齐时间戳后,用最新数据相减得到的。
如何对齐时间戳,这里到可以讲一下:
1) 先判断行情数据的顺序,或者是否有序:
可以用机器收一篮子行情数据,在共享内存里面打tsc(cpu原子钟计数)。先设置tsc计数是恒定的,不会因为超频到5GHz或者基频3GHz不同而有所区别。我这里是3个tsc=1纳秒。
以上期所为例子,行情的顺序就按照字母 A -> Z。例如先收到AL,AU,BC,RB,,,ZN。在相同品种里面,再按照数字从小到大,即先收到AL2209,再AL2210,AL2211。上期为了保证时间的统一性,切片会同赋予同样的时间,但这个切片时间不等于真实行情时间,而且会有2毫秒的延迟,并且是累计的。这里猜测是交易所虽然每隔500毫秒切,但是切片到发送,里面一些数据转换或者io消耗达2毫秒。所以观察到开盘机器对时后,行情是延迟累计增加,到后面越来越大。
2)确定价差对哪个品种是最后收到(跨期一般是远月),以那品种为基准合成价差:
这仅仅是理论做法,在实践中会遇到蛮多坑的。
以当前2202年8月为例子,跨期的话是AL2209(主力)&AL2210(次主力), AG2209(次)&AG2212(主)。我们知道用远月对齐,并且主力的tick数量,要远多于次主力的tick。
这样,由于AL2210远月而且是次主力,那么合成价差后,AL的价差数据 < AG。
这更好从硬件上面去处理,如使用FPGA。因为FPGA行情网卡收的是交易所行情镜像,它第一条数据是全息盘口数据,之后的都是增量数据。由于数据量比较小,相同的数据包会有大量的信息。这样,只要硬件解完码后,共享内存一刷新,都是最新的数据。就不从这么麻烦,例如AL2209解包,共享内存刷新一次,策略订阅的on_tick触发一次,然后AL2210解包,共享内存刷新一次,策略on_tick再触发。
而正常的行情api的话,以凌云为例(类似ctp的md api),它是在柜台层面解包后,按照品种一个小包一个小包的发出去。相同数据,数据包拆得越小,处理起来越麻烦。
不管是研究套利策略,还是多因子策略,都需要多品种的历史数据,所以下面介绍一下,如何调用vnpy的数据下载模块,来下载全市场的期货数据。
1)设置合约品种
首先,我们要先生成一个字典,来指定需要下载的数据,关键字段有3个:
这样,在RQData中,我们要下载螺纹钢指数合约的历史数据,需要转成的代号为RB99.SHFE。
然后,由于是全市场行情的数据,所以字典的数据结构如下:key是交易所,value是列表,里面包含各种期货品种,这样,只要在遍历一下这个字典,就可以得到所有,如RB99.SHFE这样结构的字符串。
symbols = {
"SHFE": ["CU", "AL", "ZN", "PB", "NI", "SN", "AU", "AG", "RB", "WR", "HC", "SS", "BU", "RU", "NR", "SP", "SC", "LU", "FU"],
"DCE": ["C", "CS", "A", "B", "M", "Y", "P", "FB","BB", "JD", "RR", "L", "V", "PP", "J", "JM", "I", "EG", "EB", "PG"],
"CZCE": ["SR", "CF", "CY", "PM","WH", "RI", "LR", "AP","JR","OI", "RS", "RM", "TA", "MA", "FG", "SF", "ZC", "SM", "UR", "SA", "CL"],
"CFFEX": ["IH","IC","IF", "TF","T", "TS"]
}
symbol_type = "99"
2) 设置下载时间
我们只需要设置下载的开始和结束时间即可,需要注意的是,vnpy数据下载模块的入参是datetime.datetime格式,所以,要做到格式的一致,代码如下:
from datetime import datetime
start_date = datetime(2005,1,1)
end_date = datetime(2020,9,10)
3)批量下载全市场数据
批量下载数据,并不难,其运作步骤如下:
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.database import database_manager
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import HistoryRequest
def load_data(req):
data = rqdata_client.query_history(req)
database_manager.save_bar_data(data)
print(f"{req.symbol}历史数据下载完成")
for exchange, symbols_list in symbols.items():
for s in symbols_list:
req = HistoryRequest(
symbol=s+symbol_type,
exchange=Exchange(exchange),
start=start_date,
interval=Interval.DAILY,
end=end_date,
)
load_data(req=req)
写好脚本后,我们运行一下代码,可以看到很快就下完全市场期货的日线数据啦。
若要下载小时或者分钟级别数据,只要把日线周期(Interval.DAILY)改成对应的小时,或者分钟即可。
有了历史数据后,自然产生每天定时更新数据的需求
1)设置定时器
我们希望在收盘后,某个时间点如下午5点启动脚本,来自动下载数据。这本质上是包含了一个父进程和一个子进程。
父进程可以是一个永远在运行的python程序,如while循环,然后设置触发条件,如当时间刚好到下午5点就启动子进程下载更新数据,其他时间则睡觉等待。
代码如下:
from datetime import datetime, time
from time import sleep
current_time = datetime.now().time()
start_time = time(17,0)
while True:
sleep(10)
if current_time == start_time:
download_data()
2)获取数据库数据
更新数据时候,我们要以数据库里面最新的数据的时间点,作为开始时间,而结束时间就是当天。比如,昨天刚好下载完所有市场的数据,那么今天我们只需要下载从昨天到今天的所有数据即可。
这样实现起来也不难,步骤如下:
1)调用database_manager.get_bar_data_statistics来得到字典格式的数据数据库所有信息
data = database_manager.get_bar_data_statistics()
2)获取各品种最新数据的时间信息,并且插入到data字典中
for d in data:
newest_bar = database_manager.get_newest_bar_data(
d["symbol"], Exchange(d["exchange"]), Interval(d["interval"])
)
d["end"] = newest_bar.datetime
然后我们看看data字典,发现真的包含所有行情的数据,但我们是基于RQData来定期更新信息的,所以要进行筛选,得到国内期货品种(通过交易所来判断)并且是日线级别的数据。
3)基于交易所和K线周期筛选品种,得到新的字典symbols,其中key包含合约代码,交易所,value就是数据库的结束时间,如下图:
symbols = {}
for i in data:
if i["interval"] == "d" and i["exchange"] in Exchanges:
vt_symbol = f"{i['symbol']}.{i['exchange']}"
end = i["end"].date()
symbols[vt_symbol] = end
4)设置下载结束时间为当天,基于symbols字典的信息,遍历组合得到HistoryRequest,然后再调用上面定义好的load_data函数下载数据并写入数据库中。
end_date = datetime.now().date()
for vt_symbol, start_date in symbols.items():
symbol = vt_symbol.split(".")[0]
exchange = vt_symbol.split(".")[1]
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(exchange),
start=start_date,
interval=Interval.DAILY,
end=end_date,
)
load_data(req=req)
下载好之后,我们再获取数据库里面最新的K线时间,发现成功更新到今天了。
2019年已经进入最后倒计时,vn.py总算是赶上末班车发布了v2.0.9版本:期权交易。
本周一,国内三大沪深300指数相关的期权已经同时上线,分别是:
2.0.9版本主要更新了围绕期权交易方面的接口和应用,和之前一样,对于使用VN Studio的用户,启动VN Station后,直接点击界面右下角的【更新】按钮就能完成自动更新升级。
对于没有安装的用户,请下载VNStudio-2.0.9,体验一键安装的量化交易Python发行版,下载链接:
https://download.vnpy.com/vnstudio-2.0.9.exe
OptionMaster是vn.py框架内针对【期权波动率交易】专门设计的上层应用模块。
初始化配置
打开OptionMaster后,会看到上图中的长条形组件。【期权产品】下拉框中会显示当前已连接的交易接口上可选的期权产品组合。
注意底层接口必须支持期权合约的交易,这里才会有对应期权产品的显示,否则是不会有任何信息的(比如连接SimNow的CTP测试环境就没有)。
点击【配置】按钮后,弹出上图所示的组合配置对话框,在这里选择要用的期权定价模型,设置期权定价中用到的无风险利率,以及每个期权链定价所用的标的物合约。
注意期权链的标的物可以选择留空,此时该期权链在后续的交易中将不会被添加到期权组合中,可以降低一部分定价相关的计算延时。
期权定价
做期权交易,第一步总是离不开正确的定价,针对国内的期权类型,OptionMaster模块中内置了三大定价模型:
每个定价模型中,从计算方向来区分,又可以分为:
所有模型中都包含了输入数值的边界检查功能,避免计算出某些异常数值。
数据模型
期权相关的量化交易,和CTA策略等单标的量化交易相比,最大的区别之一就在于会同时交易大量的合约,包括不同行权价的期权、不同行权月份的期权以及标的物期货和股票(线性资产)。
同时以上合约之间的价格、成交、持仓等情况变化还会互相影响。在任意时间点结合当前最新行情数据的情况下,期权交易员需要能够实时跟踪整个期权交易组合的波动率曲面和希腊值风险情况。
OptionMaster中专门构建了多层嵌套式的立体数据结构,来解决以上多合约数据计算中的复杂性问题:
当以上数据结构中的任意一个数据发生变化时,会同时触发与之相关的所有计算,保证整体数据结构的一致性。
T型报价
T型报价是期权交易中最常用的行情显示方式,中间白色的一列为行权价,左侧为看涨期权,右侧为看跌期权。
上图中,除了由交易接口层推送过来的原始行情数据外(买卖价格、挂单数量、成交量、持仓量等),还包含了实时计算的买卖价隐含波动率和每个期权的现金希腊值。
传统意义上的理论希腊值,直接基于期权定价模型计算,衡量的是当某一变量发生变化时期权价格的变化情况。这种方法从数学的角度容易理解,但是从交易员的实盘使用来说却十分麻烦。
假设某个50ETF期权合约的Delta数值,使用Black-Scholes期权定价公式计算出来的结果为0.5482,意味着每当ETF价格上涨1元时,该期权的价格应该上涨0.5482元。
而50ETF当前的价格大约是3元,上涨1元是足足超过30%的涨幅,对交易员来说,知道【标的物价格上涨30%期权能赚0.5482元】不能说完全没有参考价值,但效果可能也就跟【每天喝10瓶可乐一定会胖】差不多。
所以在实践中,专业期权交易员更多使用的是现金希腊值,衡量的是当某一变量发生1%变化时该期权对应的现金盈亏情况。还是用上面的这个50ETF期权合约,其现金Delta为:
0.5484(理论Delta)x 3(标的价格)x 10000 (合约乘数)x 1% = 165
这里的165,意味着每当ETF价格上涨1%时,持有1手该期权合约的盈利金额是165元,实盘交易中用来判断某个合约当前的风险水平无疑要方便得多。
除了Delta数据外,理论Gamma/Theta/Vega等希腊值也可以同样转化为更加直观的现金希腊值。
希腊值风险
有了现金希腊值,可以在交易前方便直观的了解某一期权合约的风险水平。但在交易完成后,手头持有一堆期权和标的物持仓时,我们更需要持仓希腊值来跟踪当前整个账户的期权风险暴露:
持仓希腊值 = 现金希腊值 x 合约持仓
上图中的持仓希腊值的风险,分为单合约、期权链、期权组合三个层次统计,方便交易员结合各种不同类型的波动率交易策略使用(如做多近月波动率、做空远月波动率)。
快速交易
和VN Trader主界面的交易下单组件使用类似,在上图中的【代码】编辑框中输入合约代码后回车,即可显示该合约的行情盘口数据。或者在T型报价组件上,寻找到好的交易机会后,双击单元格即可完成合约代码的自动填充和行情显示。
选好方向、开平,输入价格、数量后,点击【委托】按钮即可立即发出交易委托,点击【全撤】按钮则可一次性将当前处于活动状态的委托(未成交、部分成交)全部撤单。
2.0.9版本中同样更新完善了和期权相关的交易接口,目前针对期权合约可用的包括:
目前所有的ETF期权程序化交易(包括sopt),都需要曾经向上交所报备过、有程序化交易权限的老账户,有小道消息传闻新账户的报备将在最近放开。
最后,附上开发中的OptionMaster Pro期权交易系统,采用vn.py框架以及OptionMaster模块组件开发。
除了在核心的定价模型方面进行了Cython低延时优化,也加入了波动率曲面实时跟踪、持仓风险情景3D分析、期权组合Delta自动对冲算法等功能。
最后,还有针对期权高频套利设计的电子眼算法引擎(开发中尚未完成):
考虑到几大股指期权刚上线,期权程序化交易方面的监管尚未明朗,OptionMaster Pro目前仅对机构用户提供试用。
需要下载软件的用户,请加vn.py机构用户群(QQ群号676499931),本群只对机构用户开放,加群申请中请注明:姓名/机构/部门。
了解更多知识,请关注vn.py社区公众号。
TradeBlazer交易开拓者(简称TB),可能是许多投资者开始接触量化时的第一根拐杖,也是国内用户量最大的量化平台之一。
但随着时间过去,国内量化投资者编程水平的逐渐提高,越来越多的人开始转向Python这样的开源生态体系。
在转换平台的过程中,由于编程语法、数据结构、驱动机制等方面的区别,不少人遇到了各种困难,掉在某些坑里可能几周都爬不出来。
本篇文章中我们就来通过一个的经典趋势跟踪策略AtrRsiStrategy,来详细讲解如何一步步将TB策略代码移植到vn.py上的过程。
完整的ATR-RSI策略逻辑如下:
注意点:我们总是假设在当前K线走完计算信号并且发出委托,成交永远发生在下一根K线。即T时刻计算信号,发出委托;最快也要T+1时刻该委托才能成交。这也是下面停止单和限价单撮合的充分条件。
创建RSI指标函数
创建ATR-RSI策略
Params
Numeric rsi_length(5);
Numeric rsi_entry(16);
Vars
NumericSeries rsi_array(0);
NumericSeries rsi_value(0);
NumericSeries rsi_buy(0);
NumericSeries rsi_sell(0);
Begin
// Calculate Rsi Value
rsi_buy = 50 + rsi_entry;
rsi_sell = 50 - rsi_entry;
rsi_array = rsirsi(rsi_length);
rsi_value = rsi_array[1];
计算当前ATR指标,atr_value = atr_array[1];以及当前ATR均值,atr_ma= atr_ma_array[1]
Params
Numeric atr_length(22);
Numeric atr_ma_length(10);
Vars
NumericSeries atr_value(0);
NumericSeries atr_ma(0);
NumericSeries atr_arry(0);
NumericSeries atr_ma_array(0);
Begin
// Calculate Atr Value and Atr Ma
atr_arry = AvgTrueRange(atr_length);
atr_ma_array = Average(atr_arry[atr_ma_length], atr_ma_length);
atr_value = atr_arry[1]; // last bar for atr_value
atr_ma = atr_ma_array[1]; // last bar for atr_ma_value
空仓情况下,发出限价单委托开仓:
If(MarketPosition == 0)
{
intra_trade_low = Low[1];
intra_trade_high = High[1];
// 【Long condition】
If(rsi_value > rsi_buy AND atr_value > atr_ma)
{
long_limit = Close[1] + 5;
If(long_limit>=Low)
{
Buy(fixed_size, Min(Open, long_limit));
}
}
// 【Short condition】
Else If(rsi_value < rsi_sell AND atr_value > atr_ma)
{
short_limit = Close[1] - 5;
If(short_limit <=High)
{
SellShort(fixed_size, Max(Open, short_limit));
}
}
}
百分比移动止盈止损离场:
// postition >0
Else If(MarketPosition >0)
{
intra_trade_high = Max(intra_trade_high, High[1]);
intra_trade_low = Low[1];
long_stop = intra_trade_high * (1 - trailing_percent / 100);
If(Low <= long_stop)
{
Sell(MarketPosition, Min(Open, long_stop));
}
}
// postiton < 0
Else If(MarketPosition <0)
{
intra_trade_low = Min(intra_trade_low, Low[1]);
intra_trade_high = High[1];
short_stop = intra_trade_low *(1+ trailing_percent /100);
If(High >= short_stop)
{
BuyToCover(-MarketPosition, Max(Open, short_stop));
}
}
策略回测结果
TB完整代码
Params
Numeric atr_length(22);
Numeric atr_ma_length(10);
Numeric rsi_length(5);
Numeric rsi_entry(16);
Numeric trailing_percent(0.8);
Numeric fixed_size(1);
Vars
NumericSeries rsi_array(0);
NumericSeries atr_value(0);
NumericSeries atr_ma(0);
NumericSeries rsi_value(0);
NumericSeries rsi_buy(0);
NumericSeries rsi_sell(0);
NumericSeries intra_trade_high(0);
NumericSeries intra_trade_low(0);
NumericSeries atr_arry(0);
NumericSeries atr_ma_array(0);
NumericSeries long_stop(0);
NumericSeries short_stop(0);
NumericSeries long_limit(0);
NumericSeries short_limit(0);
Begin
// Calculate Rsi Value
rsi_buy = 50 + rsi_entry;
rsi_sell = 50 - rsi_entry;
rsi_array = rsirsi(rsi_length);
rsi_value = rsi_array[1];
// Calculate Atr Value and Atr Ma
atr_arry = AvgTrueRange(atr_length);
atr_ma_array = Average(atr_arry[atr_ma_length], atr_ma_length);
atr_value = atr_arry[1]; // last bar for atr_value
atr_ma = atr_ma_array[1]; // last bar for atr_ma_value
If(MarketPosition == 0)
{
intra_trade_low = Low[1];
intra_trade_high = High[1];
// 【Long condition】
If(rsi_value > rsi_buy AND atr_value > atr_ma)
{
long_limit = Close[1] + 5;
If(long_limit>=Low)
{
Buy(fixed_size, Min(Open, long_limit));
}
}
// 【Short condition】
Else If(rsi_value < rsi_sell AND atr_value > atr_ma)
{
short_limit = Close[1] - 5;
If(short_limit <=High)
{
SellShort(fixed_size, Max(Open, short_limit));
}
}
}
// postition >0
Else If(MarketPosition >0)
{
intra_trade_high = Max(intra_trade_high, High[1]);
intra_trade_low = Low[1];
long_stop = intra_trade_high * (1 - trailing_percent / 100);
If(Low <= long_stop)
{
Sell(MarketPosition, Min(Open, long_stop));
}
}
// postiton < 0
Else If(MarketPosition <0)
{
intra_trade_low = Min(intra_trade_low, Low[1]);
intra_trade_high = High[1];
short_stop = intra_trade_low *(1+ trailing_percent /100);
If(High >= short_stop)
{
BuyToCover(-MarketPosition, Max(Open, short_stop));
}
}
End
TB策略的逻辑完全由行情驱动,即每次有行情变化(Tick更新、K线走完)时会完整执行代码中的所有逻辑。与之不同的是,vn.py内置的CTA策略模板,提供了诸多的事件驱动回调函数,如:Tick更新驱动(on_tick函数)、K线驱动(on_bar函数)、成交驱动(on_trade)、委托驱动(on_order)等。
要移植TB上的策略,只需在vn.py策略代码的on_bar回调函数中实现对应的策略逻辑即可:
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
atr_array = am.atr(self.atr_length, array=True)
self.atr_value = atr_array[-1]
self.atr_ma = atr_array[-self.atr_ma_length:].mean()
self.rsi_value = am.rsi(self.rsi_length)
if self.pos == 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = bar.low_price
if self.atr_value > self.atr_ma:
if self.rsi_value > self.rsi_buy:
self.buy(bar.close_price + 5, self.fixed_size)
elif self.rsi_value < self.rsi_sell:
self.short(bar.close_price - 5, self.fixed_size)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
self.intra_trade_low = bar.low_price
long_stop = self.intra_trade_high * \
(1 - self.trailing_percent / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
self.intra_trade_high = bar.high_price
short_stop = self.intra_trade_low * \
(1 + self.trailing_percent / 100)
self.cover(short_stop, abs(self.pos), stop=True)
self.put_event()
完整的代码实现请参考Github仓库中的策略源代码。
策略回测结果
K线数据访问区别
TB
vn.py
委托撮合逻辑区别
TB
vn.py
策略回测结果区别
即使在策略逻辑层面已经做到一致,TB和vn.py的回测资金曲线图依旧可能存在某些细节方面的区别。主要原因是数据源方面的不同,TB使用的是自身提供的历史数据源,而vn.py默认推荐使用的是RQData数据服务。
了解更多知识,请关注vn.py社区公众号。
逐笔成交统计想用通用化,难点在于去限定一次完整开平交易的开始点和结束点,抽象来说就是寻找特殊的断点对所有成交记录进行划分。
断点的选择
而在算法状态机控制中,我们可以知道数字0是一个非常有用的评判标准,即我们构建一列数据,让它数值在完全平仓后变成0,就知道真正的平仓时间。
在实践中,累计净持仓恰恰好符合这个标准,我们把多头仓位设为”+”,空头仓位设为“-”,得到如下表的【方向持仓】,对【方向持仓】进行累计得到【净持仓】。
这样,我们基于【净持仓】为0可以得到每次开平交易的结束点。而该结束点为成交记录的断点。
使用断点划分成交记录
为了简单演示,下面我们只显示【净持仓】(列)为0的成交信息(行),如下表所示,一共发生了5开完整的开平仓交易。每笔交易的结束点对应的交易序号分别为3、5、8、12、20。这5个结束点即为对所有成交信息的断点。
之后,我们要引入2个新的概念:
存量是静态的,可以理解为对累计统计量的信息进行时间切片;而增量是动态的,代表时间切片信息的变化量,所以他们二者的关系如下:
T0时刻存量 + T0->T1增量 = T1时刻存量
换句话说,
T0->T1增量 = T1时刻存量 - T0时刻存量
回到逐笔回测统计主题上,增量这个概念,就能代表最新的完整开平仓交易,例如其每笔盈亏,对累计盈亏的影响。
如下图所示,在完成第一笔开平仓交易后,累计盈亏是1000;完成了第二笔完整的开平仓交易,累计盈亏是2000,那么二者的差别,即2000-1000=1000。这增加1000的盈利,就是属于第二笔开平仓交易的。
所以,通过对每个断点存量信息的对比,我们就可以得到每笔开平仓成交后的统计量:
这些开平仓的统计量可以如下表所示的开平成交量、开平盈亏,也可以是开平仓交易的持仓时间、手续费、滑点以及净盈亏:
计算开平交易结果
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)
def calculate_trades_result(trades):
"""
Deal with trade data
"""
dt, direction, offset, price, volume = [], [], [], [], []
for i in trades.values():
dt.append(i.datetime)
direction.append(i.direction.value)
offset.append(i.offset.value)
price.append(i.price)
volume.append(i.volume)
# Generate DataFrame with datetime, direction, offset, price, volume
df = pd.DataFrame()
df["direction"] = direction
df["offset"] = offset
df["price"] = price
df["volume"] = volume
df["current_time"] = dt
df["last_time"] = df["current_time"].shift(1)
# Calculate trade amount
df["amount"] = df["price"] * df["volume"]
df["acum_amount"] = df["amount"].cumsum()
# Calculate pos, net pos(with direction), acumluation pos(with direction)
def calculate_pos(df):
if df["direction"] == "多":
result = df["volume"]
else:
result = - df["volume"]
return result
df["pos"] = df.apply(calculate_pos, axis=1)
df["net_pos"] = df["pos"].cumsum()
df["acum_pos"] = df["volume"].cumsum()
# Calculate trade result, acumulation result
# ej: trade result(buy->sell) means (new price - old price) * volume
df["result"] = -1 * df["pos"] * df["price"]
df["acum_result"] = df["result"].cumsum()
# Filter column data when net pos comes to zero
def get_acum_trade_result(df):
if df["net_pos"] == 0:
return df["acum_result"]
df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)
def get_acum_trade_volume(df):
if df["net_pos"] == 0:
return df["acum_pos"]
df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)
def get_acum_trade_duration(df):
if df["net_pos"] == 0:
return df["current_time"] - df["last_time"]
df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)
def get_acum_trade_amount(df):
if df["net_pos"] == 0:
return df["acum_amount"]
df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1)
# Select row data with net pos equil to zero
df = df.dropna()
return df
def generate_trade_df(trades, size, rate, slippage, capital):
"""
Calculate trade result from increment
"""
df = calculate_trades_result(trades)
trade_df = pd.DataFrame()
trade_df["close_direction"] = df["direction"]
trade_df["close_time"] = df["current_time"]
trade_df["close_price"] = df["price"]
trade_df["pnl"] = df["acum_trade_result"] - \
df["acum_trade_result"].shift(1).fillna(0)
trade_df["volume"] = df["acum_trade_volume"] - \
df["acum_trade_volume"].shift(1).fillna(0)
trade_df["duration"] = df["current_time"] - \
df["last_time"]
trade_df["turnover"] = df["acum_trade_amount"] - \
df["acum_trade_amount"].shift(1).fillna(0)
trade_df["commission"] = trade_df["turnover"] * rate
trade_df["slipping"] = trade_df["volume"] * size * slippage
trade_df["net_pnl"] = trade_df["pnl"] - \
trade_df["commission"] - trade_df["slipping"]
result = calculate_base_net_pnl(trade_df, capital)
return result
汇总生成资金曲线
def calculate_base_net_pnl(df, capital):
"""
Calculate statistic base on net pnl
"""
df["acum_pnl"] = df["net_pnl"].cumsum()
df["balance"] = df["acum_pnl"] + capital
df["return"] = np.log(
df["balance"] / df["balance"].shift(1)
).fillna(0)
df["highlevel"] = (
df["balance"].rolling(
min_periods=1, window=len(df), center=False).max()
)
df["drawdown"] = df["balance"] - df["highlevel"]
df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
df.reset_index(drop=True, inplace=True)
return df
统计整体策略效果
def statistics_trade_result(df, capital, show_chart=True):
""""""
end_balance = df["balance"].iloc[-1]
max_drawdown = df["drawdown"].min()
max_ddpercent = df["ddpercent"].min()
pnl_medio = df["net_pnl"].mean()
trade_count = len(df)
duration_medio = df["duration"].mean().total_seconds()/3600
commission_medio = df["commission"].mean()
slipping_medio = df["slipping"].mean()
win = df[df["net_pnl"] > 0]
win_amount = win["net_pnl"].sum()
win_pnl_medio = win["net_pnl"].mean()
win_duration_medio = win["duration"].mean().total_seconds()/3600
win_count = len(win)
loss = df[df["net_pnl"] < 0]
loss_amount = loss["net_pnl"].sum()
loss_pnl_medio = loss["net_pnl"].mean()
loss_duration_medio = loss["duration"].mean().total_seconds()/3600
loss_count = len(loss)
winning_rate = win_count / trade_count
win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio
total_return = (end_balance / capital - 1) * 100
return_drawdown_ratio = -total_return / max_ddpercent
output(f"起始资金:\t{capital:,.2f}")
output(f"结束资金:\t{end_balance:,.2f}")
output(f"总收益率:\t{total_return:,.2f}%")
output(f"最大回撤: \t{max_drawdown:,.2f}")
output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
output(f"总成交次数:\t{trade_count}")
output(f"盈利成交次数:\t{win_count}")
output(f"亏损成交次数:\t{loss_count}")
output(f"胜率:\t\t{winning_rate:,.2f}")
output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")
output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
output(f"平均持仓小时:\t{duration_medio:,.2f}")
output(f"平均每笔手续费:\t{commission_medio:,.2f}")
output(f"平均每笔滑点:\t{slipping_medio:,.2f}")
output(f"总盈利金额:\t{win_amount:,.2f}")
output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")
output(f"总亏损金额:\t{loss_amount:,.2f}")
output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")
if not show_chart:
return
plt.figure(figsize=(10, 12))
acum_pnl_plot = plt.subplot(3, 1, 1)
acum_pnl_plot.set_title("Balance Plot")
df["balance"].plot(legend=True)
pnl_plot = plt.subplot(3, 1, 2)
pnl_plot.set_title("Pnl Per Trade")
df["net_pnl"].plot(legend=True)
distribution_plot = plt.subplot(3, 1, 3)
distribution_plot.set_title("Trade Pnl Distribution")
df["net_pnl"].hist(bins=100)
plt.show()
def output(msg):
"""
Output message with datetime.
"""
print(f"{datetime.now()}\t{msg}")
统计纯多头和纯空头交易
纯多头交易就是只有多开->空平的交易,而纯空头交易就是反过来。
为了筛选出纯多开交易,只要在DataFrame中判断其平仓方向的空的即可;纯空头交易则反过来,平仓方向为多。
def buy2sell(df, capital):
"""
Generate DataFrame with only trade from buy to sell
"""
buy2sell = df[df["close_direction"] == "空"]
result = calculate_base_net_pnl(buy2sell, capital)
return result
def short2cover(df, capital):
"""
Generate DataFrame with only trade from short to cover
"""
short2cover = df[df["close_direction"] == "多"]
result = calculate_base_net_pnl(short2cover, capital)
return result
整合所有计算步骤
最后,我们将上文中所有的函数进行整合,封装到单个函数中,用于实现策略回测效果的一键生成:
def exhaust_trade_result(
trades,
size: int = 10,
rate: float = 0.0,
slippage: float = 0.0,
capital: int = 1000000,
show_long_short_condition=True
):
"""
Exhaust all trade result.
"""
total_trades = generate_trade_df(trades, size, rate, slippage, capital)
statistics_trade_result(total_trades, capital)
if not show_long_short_condition:
return
long_trades = buy2sell(total_trades, capital)
short_trades = short2cover(total_trades, capital)
output("-----------------------")
output("纯多头交易")
statistics_trade_result(long_trades, capital)
output("-----------------------")
output("纯空头交易")
statistics_trade_result(short_trades, capital)
最后附上完整的源代码
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)
def calculate_trades_result(trades):
"""
Deal with trade data
"""
dt, direction, offset, price, volume = [], [], [], [], []
for i in trades.values():
dt.append(i.datetime)
direction.append(i.direction.value)
offset.append(i.offset.value)
price.append(i.price)
volume.append(i.volume)
# Generate DataFrame with datetime, direction, offset, price, volume
df = pd.DataFrame()
df["direction"] = direction
df["offset"] = offset
df["price"] = price
df["volume"] = volume
df["current_time"] = dt
df["last_time"] = df["current_time"].shift(1)
# Calculate trade amount
df["amount"] = df["price"] * df["volume"]
df["acum_amount"] = df["amount"].cumsum()
# Calculate pos, net pos(with direction), acumluation pos(with direction)
def calculate_pos(df):
if df["direction"] == "多":
result = df["volume"]
else:
result = - df["volume"]
return result
df["pos"] = df.apply(calculate_pos, axis=1)
df["net_pos"] = df["pos"].cumsum()
df["acum_pos"] = df["volume"].cumsum()
# Calculate trade result, acumulation result
# ej: trade result(buy->sell) means (new price - old price) * volume
df["result"] = -1 * df["pos"] * df["price"]
df["acum_result"] = df["result"].cumsum()
# Filter column data when net pos comes to zero
def get_acum_trade_result(df):
if df["net_pos"] == 0:
return df["acum_result"]
df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)
def get_acum_trade_volume(df):
if df["net_pos"] == 0:
return df["acum_pos"]
df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)
def get_acum_trade_duration(df):
if df["net_pos"] == 0:
return df["current_time"] - df["last_time"]
df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)
def get_acum_trade_amount(df):
if df["net_pos"] == 0:
return df["acum_amount"]
df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1)
# Select row data with net pos equil to zero
df = df.dropna()
return df
def generate_trade_df(trades, size, rate, slippage, capital):
"""
Calculate trade result from increment
"""
df = calculate_trades_result(trades)
trade_df = pd.DataFrame()
trade_df["close_direction"] = df["direction"]
trade_df["close_time"] = df["current_time"]
trade_df["close_price"] = df["price"]
trade_df["pnl"] = df["acum_trade_result"] - \
df["acum_trade_result"].shift(1).fillna(0)
trade_df["volume"] = df["acum_trade_volume"] - \
df["acum_trade_volume"].shift(1).fillna(0)
trade_df["duration"] = df["current_time"] - \
df["last_time"]
trade_df["turnover"] = df["acum_trade_amount"] - \
df["acum_trade_amount"].shift(1).fillna(0)
trade_df["commission"] = trade_df["turnover"] * rate
trade_df["slipping"] = trade_df["volume"] * size * slippage
trade_df["net_pnl"] = trade_df["pnl"] - \
trade_df["commission"] - trade_df["slipping"]
result = calculate_base_net_pnl(trade_df, capital)
return result
def calculate_base_net_pnl(df, capital):
"""
Calculate statistic base on net pnl
"""
df["acum_pnl"] = df["net_pnl"].cumsum()
df["balance"] = df["acum_pnl"] + capital
df["return"] = np.log(
df["balance"] / df["balance"].shift(1)
).fillna(0)
df["highlevel"] = (
df["balance"].rolling(
min_periods=1, window=len(df), center=False).max()
)
df["drawdown"] = df["balance"] - df["highlevel"]
df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
df.reset_index(drop=True, inplace=True)
return df
def buy2sell(df, capital):
"""
Generate DataFrame with only trade from buy to sell
"""
buy2sell = df[df["close_direction"] == "空"]
result = calculate_base_net_pnl(buy2sell, capital)
return result
def short2cover(df, capital):
"""
Generate DataFrame with only trade from short to cover
"""
short2cover = df[df["close_direction"] == "多"]
result = calculate_base_net_pnl(short2cover, capital)
return result
def statistics_trade_result(df, capital, show_chart=True):
""""""
end_balance = df["balance"].iloc[-1]
max_drawdown = df["drawdown"].min()
max_ddpercent = df["ddpercent"].min()
pnl_medio = df["net_pnl"].mean()
trade_count = len(df)
duration_medio = df["duration"].mean().total_seconds()/3600
commission_medio = df["commission"].mean()
slipping_medio = df["slipping"].mean()
win = df[df["net_pnl"] > 0]
win_amount = win["net_pnl"].sum()
win_pnl_medio = win["net_pnl"].mean()
win_duration_medio = win["duration"].mean().total_seconds()/3600
win_count = len(win)
loss = df[df["net_pnl"] < 0]
loss_amount = loss["net_pnl"].sum()
loss_pnl_medio = loss["net_pnl"].mean()
loss_duration_medio = loss["duration"].mean().total_seconds()/3600
loss_count = len(loss)
winning_rate = win_count / trade_count
win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio
total_return = (end_balance / capital - 1) * 100
return_drawdown_ratio = -total_return / max_ddpercent
output(f"起始资金:\t{capital:,.2f}")
output(f"结束资金:\t{end_balance:,.2f}")
output(f"总收益率:\t{total_return:,.2f}%")
output(f"最大回撤: \t{max_drawdown:,.2f}")
output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
output(f"总成交次数:\t{trade_count}")
output(f"盈利成交次数:\t{win_count}")
output(f"亏损成交次数:\t{loss_count}")
output(f"胜率:\t\t{winning_rate:,.2f}")
output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")
output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
output(f"平均持仓小时:\t{duration_medio:,.2f}")
output(f"平均每笔手续费:\t{commission_medio:,.2f}")
output(f"平均每笔滑点:\t{slipping_medio:,.2f}")
output(f"总盈利金额:\t{win_amount:,.2f}")
output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")
output(f"总亏损金额:\t{loss_amount:,.2f}")
output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")
if not show_chart:
return
plt.figure(figsize=(10, 12))
acum_pnl_plot = plt.subplot(3, 1, 1)
acum_pnl_plot.set_title("Balance Plot")
df["balance"].plot(legend=True)
pnl_plot = plt.subplot(3, 1, 2)
pnl_plot.set_title("Pnl Per Trade")
df["net_pnl"].plot(legend=True)
distribution_plot = plt.subplot(3, 1, 3)
distribution_plot.set_title("Trade Pnl Distribution")
df["net_pnl"].hist(bins=100)
plt.show()
def output(msg):
"""
Output message with datetime.
"""
print(f"{datetime.now()}\t{msg}")
def exhaust_trade_result(
trades,
size: int = 10,
rate: float = 0.0,
slippage: float = 0.0,
capital: int = 1000000,
show_long_short_condition=True
):
"""
Exhaust all trade result.
"""
total_trades = generate_trade_df(trades, size, rate, slippage, capital)
statistics_trade_result(total_trades, capital)
if not show_long_short_condition:
return
long_trades = buy2sell(total_trades, capital)
short_trades = short2cover(total_trades, capital)
output("-----------------------")
output("纯多头交易")
statistics_trade_result(long_trades, capital)
output("-----------------------")
output("纯空头交易")
statistics_trade_result(short_trades, capital)
了解更多知识,请关注vn.py社区公众号。
【快速交易】组件有固定宽度的空白列
移动【T型报价版】小窗口列宽,只显示固定列宽内容:即随着鼠标向右拖动, 组件窗口内的左边空白处增多
【情景分析组件】,点击【执行分析】后,x轴和y轴的名称,如隐含波动率,价格 不显示
【希腊值风险】首列的【代码】,最初显示是缩在一起的,建设最初显示时后,展开首列的列宽
建议对坐标进行限制,优化曲线移动时候体验。(有时候,移动过大导致找不到隐含波动率曲线)
如
在【隐含波动率曲线】,鼠标右键点击【export】出现报错, 显示无法import QtSvg
在实盘交易中,逐日盯市(Marking-to-Market)是基于当日的收盘价、仓位数据、成交数据等来统计每日的盈亏,用于交易所对于客户盈亏情况的每日清算以及保证金管理。
在策略回测中,逐日盯市统计的算法也可以用于对策略盈亏曲线的计算和绘制。一个好的策略,资金曲线总是整体向上,并且相对平滑无太大回撤的,换句话说,就是夏普比率和收益回撤比都比较高。2.0版本的vn.py框架的CTA回测引擎,为了更直观的评估策略的整体效果,内置的盈亏统计采用了逐日盯市的模式。
但因为将每日所有的成交数据都映射到了最终收盘时的结果,逐日盯市统计的方式,无法在每笔开平仓交易的层面来分析盈亏情况,例如:手续费和滑点相对平均盈亏的占比、策略交易胜率和盈亏比等统计指标。
考虑到以上信息对于策略开发和研究的重要性,在本文中我们设计了一种新的逐笔开平对冲算法,来解决相关回测统计指标计算的问题。
在讲解代码前,先通过例子来简单介绍一下逐笔对冲统计这个概念:
上面的例子中可以知道该笔开平仓赚了12元,在实际成交中我们还需要考虑手续费和滑点:
计算完每笔交易的手续费和滑点后,我们就可以最终得到该笔开平仓交易的净盈亏情况。除了最简单的一开一平外,现实中许多策略的开平交易情况可能复杂得多,总体上可以分为:
计算完逐笔开平仓盈亏后,我们就可以统计每次交易的胜率和盈亏比了,更进一步还可以对交易方向进行筛选,来看看纯多头交易和纯空头交易的盈亏情况。
以下是vn.py中CTA回测引擎缓存回测成交信息的代码:
# Push trade update
self.trade_count += 1
if long_cross:
trade_price = min(order.price, long_best_price)
pos_change = order.volume
else:
trade_price = max(order.price, short_best_price)
pos_change = -order.volume
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=str(self.trade_count),
direction=order.direction,
offset=order.offset,
price=trade_price,
volume=order.volume,
time=self.datetime.strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
trade.datetime = self.datetime
self.strategy.pos += pos_change
self.strategy.on_trade(trade)
self.trades[trade.vt_tradeid] = trade
每一笔成交信息都以TradeData的数据格式缓存在trades字典中,我们可以通过打印输出该字典来直观地看看TradeData的数据结构。
对成交缓存数据的结构有个大概的了解后,接下来遍历engine.trades的值,依次打印:
在遍历过程中,若检测到是平仓操作,即value.offset.value == "平”,则另外打印分隔线,便于肉眼观察每笔开平仓所对应的时间、价格等:
trade = engine.trades
for value in trade.values():
print("时间:",value.datetime,value.direction.value,value.offset.value, "价格:",value.price, "数量:",value.volume)
if value.offset.value == "平":
print("---------------------------------------------------------")
尽管只是在Jupyter Notebook中简单的进行打印输出,已经能够一目了然的看到每次开平仓交易的基本信息。
其中的回测价格信息可以对照实盘成交回报信息进行对比,去计算真实成交和回测成交的价差,统计真实滑点,每隔一段时间(如一个月后)对回测中用到的滑点参数进行调整,力求回测尽量与实盘交易一致。
这里我们从简单的情况开始着手,首先做出假设条件:
由于不需要考虑较为复杂的一次委托多次成交情况,每次开仓成交后的下一笔必定是成交量相等的平仓成交,那么可以设计出如下的计算逻辑:
1.构建原始成交数据DataFrame,其中包括:日期时间、成交方向、开平仓、价格、数量;
2.把【价格】列表向后平移一个单位,得到上一笔成交记录;
a)若【成交方向】、【开平仓】为“多平”,意味着本次交易为空开->多平,那么盈亏=(开仓价格-平仓价格)* 成交数量;持仓时间=平仓时间-开仓时间;
b)若【成交方向】、【开平仓】为“空平”,意味着本次交易为多开->空平,那么盈亏=(平仓价格-开仓价格)* 成交数量;持仓时间=平仓时间-开仓时间;
4.对最后一行进行额外处理:若【开平】为“开”,【平仓价】和【持仓时间】设置为"待定",【盈亏】设为0;
5.使用dropna把【盈亏】为空值的行去掉;
6.对DataFrame的索引重新排序;
7.计算【累计盈亏】并画出图。
同时在实盘交易中做每日收盘后的统计时,我们可以设置DataFrame只显示当月的成交记录,便于重点观察最近一段时间的盈亏情况、持仓时间等:
上图中,我们可以一目了然地看出每一组完整的开平仓交易的最终盈亏以及持仓时间。
搞定了简单的情况,接下来我们可以将算法变得更加通用化,满足更多场景:一次开仓多次平仓、多次开仓一次平仓以及更加复杂的多次开仓多次平仓。
了解更多知识,请关注vn.py社区公众号。
登陆Option Master后,
1) 选择中金所的IO期权 并且与当月IF股指期货作为对冲。
2) 其他组件均正常显示,除了【场景分析】不显示3D图。点击【执行分析】后,报错:Zero Division Error:Float DIvision
3) 报错出现后,有时候window后台无法关闭进程,显示的是【无响应】
4)但是Option Master能正常订阅行情并且波动率曲线微笑能跟着变化
作为量化开源框架,vn.py的一大好处在于自由度高,用户可以基于开放的源代码来专门实现自己特定的功能。对于常用的量化系统配置,VN Trader提供了全局配置工具方便用户直接在图形界面上进行修改。
进入VN Trader后,点击菜单栏上的【配置】按钮弹出【全局配置】窗口:
尽管参数看着挺多,但总体上可以分成以下5大类:
下面我们来对这5大类的配置进行详细说明。
font.size:设置VN Trader整体字体大小,需要根据自己显示器的实际分辨率效果进行修改,如下图中的15号字体。
日志输出
VN Trader的日志文件,默认位于运行时目录的.vntrader\log目录下,完整路径为:
C:\users\administrator.vntrader\log
其中administrator为当前Windows系统的登录用户名。
综合来说,如果想要记录更多的VN Trader系统运行细节信息,建议将level等级调低获取更多的信息,下面是开启Log.console和Log.File后的效果:
日志文件则会根据启动VN Trader时的日期自动创建:
每天盘中自动化交易时,我们可能希望每有委托成交,能够收到实时的通知;或者若出现异常情况,如数据错误、连接断开等,也要通知一下。
vn.py内置了邮件引擎EmailEngine,只要进配置好邮箱的账号、密码、服务器等信息,后续即可调用MainEngine.send_email函数来非常方便的发送邮件通知。
在这里我们通过QQ邮箱进行演示:
得到授权码后,回到VN Trader的邮件相关的配置:
配置完成后,重启VN Trader,点击菜单栏【帮助】->【测试邮件】发送测试邮件,若能顺利收到,则说明邮件设置成功。
比起在手机上装邮箱客户端,使用微信来接受实时的消息通知,无疑要方便得多,而且只需要准备一个QQ邮箱:
最后将VN Trader的全局配置中email.receiver改成该QQ号对应的QQ邮箱即可实施在手机端接收vn.py监控消息了。
RQData是目前国内期货和股票数据方面,性价较高的三方数据提供商之一。购买RQData后(或者申请试用账号),会获得了其license文件,只需将其中的内容填入以下字段即可:
rqdata.password:RQData的license。
(注意这里的username和password不是米筐官网登录用的账号和密码!)
vn.py目前支持4个常用数据库:
其中SQLite为vn.py的默认数据库,它的优势主要表现在2点:
当然,其他数据库在特定的场景下也都有着自己的优势,如:更快的加载速度、支持多用户同时访问等。改为使用其他数据库,首先需要准备完该数据库的服务器以及图形客户端,然后在VN Trader全局配置进行相关的全局配置。
vn.py默认数据库,不需要修改任何配置。在第一次启动VN Trader时,程序会自动在用户目录下的.vntrader文件夹中生成database.db文件,后续所有相关的历史行情数据都会放在该文件中。
基础的配置只需要配置连接的数据库名称、主机名和端口号,至于用户登录信息和授权信息,可以留空。
和MySQL配置几乎一模一样,只需要将端口database.port改为5432:
2019年vn.py核心团队的最后一期小班课报名进行中!
两天共计10小时的密集提高课程
8套高端CTA策略源代码分享
DoubleChannel
KeltnerBandit
RsiMomentum
SuperTurtle
TrendThrust
Cinco
Cuatro
SuperCombo
动态风险仓位管理模型
策略内嵌复杂算法交易
详情请戳:第四期vn.py小班课上线:CTA策略开发!
了解更多知识,请关注vn.py社区公众号。
基于物理上的限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,多线程的编程方式被越来越多地应用到了各类程序中,而随之带来的则是线程间数据一致性和状态同步的困难。
作为已经30岁的Python,自然早已支持多线程的功能,但坊间却始终存在着一种误解:Python的多线程是假的(或者虚拟机模拟的)。
Python虚拟机(或者叫解释器)使用了引用计数法的垃圾回收方式(另一种是以Java虚拟机为例的根搜索算法),这种垃圾回收方式使得Python虚拟机在运行多线程程序时,需要使用一把名为GIL(Global Interpreter Lock,全局解释器锁)的超级大锁,来保证每个对象上的引用计数正确。
从操作系统来看,每个CPU在同一时间都能够且只能执行一个线程。而在Python虚拟机上,任何一个线程的运行,都需要包含以下三个步骤:
因此,某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个Python进程中GIL也只有一个。所以哪怕硬件上CPU有再多的核心,任意时刻都只能有一个线程能拿到GIL来执行,这也是之前提到的误解来源。
Python多线程的痛点在于每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。这导致很多时候,尤其是计算密集型任务为主的程序,多核多线程比单核多线程更差:
因此,在Python中想要充分压榨多核CPU的性能,必须依赖多进程的模式。每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行。
Python语言中内置了专门用于实现多进程的multiprocessing库,使用上相当傻瓜,通过multiprocessing.Process类来创建一个新的子进程对象,再启动这个对象,这样一个多进程任务就开始执行了。
等CPU分配一个独立核心去干活,func函数就在这个子进程中开始执行了,这里唯一要注意args是默认输入元组参数。
p = multiprocessing.Process(target=func, args=(a,))
p.start()
除了一个一个的启动子进程外,也可以使用multiprocessing.Pool来创建进程池对象,把需要干的工作任务打包好,放在这个池子里面,这样一个任务执行完CPU核心空闲下来后,就能自动从进程池中去获取一个新的任务继续干活。
基本的使用步骤如下:
多进程参数优化
学习多进程模块怎么用,最好的例子之一就是vn.py的CTA策略回测引擎中的参数优化功能,加载同样的历史数据基于不同的参数,执行历史数据回放和策略盈亏统计,属于典型的多进程应用场景。
多进程优化函数位于:
vnpy.app.cta_strategy.backtesting.BacktestingEngine.run_optimization
该函数中的执行步骤如下:
def run_optimization(self, optimization_setting: OptimizationSetting, output=True):
""""""
# Get optimization setting and target
settings = optimization_setting.generate_setting()
target_name = optimization_setting.target_name
if not settings:
self.output("优化参数组合为空,请检查")
return
if not target_name:
self.output("优化目标未设置,请检查")
return
# Use multiprocessing pool for running backtesting with different setting
pool = multiprocessing.Pool(multiprocessing.cpu_count())
results = []
for setting in settings:
result = (pool.apply_async(optimize, (
target_name,
self.strategy_class,
setting,
self.vt_symbol,
self.interval,
self.start,
self.rate,
self.slippage,
self.size,
self.pricetick,
self.capital,
self.end,
self.mode
)))
results.append(result)
pool.close()
pool.join()
# Sort results and output
result_values = [result.get() for result in results]
result_values.sort(reverse=True, key=lambda result: result[1])
if output:
for value in result_values:
msg = f"参数:{value[0]}, 目标:{value[1]}"
self.output(msg)
return result_values
启动多进程优化的任务后,打开Windows的任务管理器,可以看到此时CPU所有的8个核心都已经在满载运行了。
2019年vn.py核心团队的最后一期小班课开始报名:
两天共计10小时的密集提高课程
8套高端CTA策略源代码分享
DoubleChannel
KeltnerBandit
RsiMomentum
SuperTurtle
TrendThrust
Cinco
Cuatro
SuperCombo
动态风险仓位管理模型
策略内嵌复杂算法交易
详情请戳:第四期vn.py小班课上线:CTA策略开发!
了解更多知识,请关注vn.py社区公众号。
策略开发离不开数据分析:
从另一角度来说,CTA策略开发前的建模分析流程如下:
第1步:对行情数据进行画图
首先,调用vn.py数据库模块的database_manager.load_bar_data函数从数据库载入数据到内存中。
load_bar_data函数的输入参数有5个:
load_bar_data函数输出的是一个包含系列BarData格式行情数据的列表bars。
from vnpy.trader.database import database_manager
output("开始加载历史数据")
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
output(f"历史数据加载完成,数据量:{len(bars)}")
def output(msg):
""""""
print(f"{datetime.now()}\t{msg}")
用for循环读取bars列表中的BarData数据,然后分别缓存时间、开盘价、最高价、最低价、收盘价到专门的列表中,最终合成DataFrame, 设置DataFrame索引为K线数据的时间。
# Generate history data in DataFrame
t = []
o = []
h = []
l = []
c = []
for bar in bars:
time = bar.datetime
open_price = bar.open_price
high_price = bar.high_price
low_price = bar.low_price
close_price = bar.close_price
t.append(time)
o.append(open_price)
h.append(high_price)
l.append(low_price)
c.append(close_price)
self.orignal = pd.DataFrame()
self.orignal["open"] = o
self.orignal["high"] = h
self.orignal["low"] = l
self.orignal["close"] = c
self.orignal.index = t
对收盘价进行画图,设置图的尺寸和标题。然后用肉眼初步确认时间序列图无数据缺失或者明显“异常”行情。
output("第一步:画出行情图,检查数据断点")
self.orignal["close"].plot(figsize=(20, 8), title="close_price")
plt.show()
第2步:随机性检验
调用statsmodels库的acorr_ljungbox函数,对收盘价进行白噪声检验,函数返回一个p值,p值越大表示原假设成立的可能性越大,即数据是随机的可能性越大。
一般p值与0.05进行对比:
from statsmodels.stats.diagnostic import acorr_ljungbox
def random_test(close_price):
"""
白噪声检验
"""
acorr_result = acorr_ljungbox(close_price, lags=1)
p_value = acorr_result[1]
if p_value < 0.05:
output("第二步:随机性检验:非纯随机性")
else:
output("第二步:随机性检验:纯随机性")
output(f"白噪声检验结果:{acorr_result}\n")
第3步:平稳性检验
同样,调用statsmodels库的adfuller函数对收盘价进行单位根检验,函数返回的是一个字典,我们对字典里面的字段进行判断:
CTA策略研究的是非平稳性时间序列,平稳时间序列适用于期货价差的统计套利。
from statsmodels.tsa.stattools import adfuller as ADF
def stability_test(close_price):
"""
平稳性检验
"""
statitstic = ADF(close_price)
t_s = statitstic[1]
t_c = statitstic[4]["10%"]
if t_s > t_c:
output("第三步:平稳性检验:存在单位根,时间序列不平稳")
else:
output("第三步:平稳性检验:不存在单位根,时间序列平稳")
output(f"ADF检验结果:{statitstic}\n")
第4步:画出自相关图
同样,调用statsmodels库的plot_acf函数和plot_pacf函数对收盘价画自相关和偏自相关图:
置信区间被画成圆锥形。默认情况下,置信区间被设置为95%,这表明,圆锥之外的值很可能是相关的,而不是统计上的意外。
也就是说,圆锥以外的值越多,时间序列自相关性越强,越适用于研究CTA策略。
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
def autocorrelation_test(close_price):
"""
自相关性检验
"""
output("第四步:画出自相关性图,观察自相关特性")
plot_acf(close_price, lags=60)
plt.show()
plot_pacf(close_price, lags=60).show()
plt.show()
第5步:相对波动率分析
若相对波动率分布属于右偏(肥尾在右边),且分布陡峭,在统计学上具有尖峰肥尾的特色,适用于研究CTA策略。
def relative_volatility_analysis(self, df: DataFrame = None):
"""
相对波动率
"""
output("第五步:相对波动率分析")
df["volatility"] = talib.ATR(
np.array(df['high']),
np.array(df['low']),
np.array(df['close']),
self.window_volatility
)
df["fixed_cost"] = df["close"] * self.rate
df["relative_vol"] = df["volatility"] - df["fixed_cost"]
df["relative_vol"].plot(figsize=(20, 6), title="relative volatility")
plt.show()
df["relative_vol"].hist(bins=200, figsize=(20, 6), grid=False)
plt.show()
statitstic_info(df["relative_vol"])
def statitstic_info(df):
"""
描述统计信息
"""
mean = round(df.mean(), 4)
median = round(df.median(), 4)
output(f"样本平均数:{mean}, 中位数: {median}")
skew = round(df.skew(), 4)
kurt = round(df.kurt(), 4)
if skew == 0:
skew_attribute = "对称分布"
elif skew > 0:
skew_attribute = "分布偏左"
else:
skew_attribute = "分布偏右"
if kurt == 0:
kurt_attribute = "正态分布"
elif kurt > 0:
kurt_attribute = "分布陡峭"
else:
kurt_attribute = "分布平缓"
output(f"偏度为:{skew},属于{skew_attribute};峰度为:{kurt},属于{kur
t_attribute}
\n")
第6步:变化率分析
def growth_analysis(self, df: DataFrame = None):
"""
百分比K线变化率
"""
output("第六步:变化率分析")
df["pre_close"] = df["close"].shift(1).fillna(0)
df["g%"] = 100 * (df["close"] - df["pre_close"]) / df["close"]
df["g%"].plot(figsize=(20, 6), title="growth", ylim=(-5, 5))
plt.show()
df["g%"].hist(bins=200, figsize=(20, 6), grid=False)
plt.show()
statitstic_info(df["g%"])
想了解更多关于CTA策略开发实战的各种细节?请戳课程上线:《vn.py全实战进阶》!目前课程已经更新过半,一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。
了解更多知识,请关注vn.py社区公众号。
R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。
R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。
R-Breaker策略的核心逻辑由以下4部分构成:
1)计算6个目标价位
根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:
具体计算方法如下:(其中a、b、c、d为策略参数)
2)设计委托逻辑
趋势策略交易:
反转策略交易:
3)设定相应的止盈止损
4)日内策略要求收盘前平仓
以上是原版R-Breaker策略逻辑,但使用RQData从2010年至今(2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。
从逻辑上看R-Breaker策略可以拆分成趋势策略和反转策略,那么不妨试试将这两种逻辑分开,并逐个进行优化。
1)趋势策略:
若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;
其代码实现逻辑如下:
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
同样使用2010年至今的1分钟IF88数据进行回测。不过在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。
1)趋势策略:
2)反转策略
综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:
由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。
由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。
为了方便演示,我们设置趋势策略每次交易1手,反转策略则是3手,然后将两者合成为R-Breaker策略。
经过以上的仓位配置后,回测结果中的夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。
R-Breaker策略的成功之处在于它并不是纯粹的趋势类策略,而是属于复合型策略,其alpha由两部分构成:趋势策略alpha和反转策略alpha。
这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。
更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金,这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。
从上面的例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为:夏普比率=超额收益/风险,所以夏普比率高意味着资金曲线非常平滑,也意味着我们可以有效的控制使用杠杆的风险。
当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如采用结构化产品的形式,经过银行等中介通过发行优先级份额,来为劣后级份额提供杠杆。这时劣后级投资者(有时100%是交易团队自身持有)同时也是债务人的角色,即在承担更大风险的同时,追求更高的最终收益,而优先级投资者则作为债权人享受利息收益。
最后,秉承vn.py社区的一贯精神:
Talk is cheap, show me your pnl (or code) !
自然必须附上策略的源代码:
from datetime import time
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager
)
class RBreakStrategy(CtaTemplate):
""""""
author = "KeKe"
setup_coef = 0.25
break_coef = 0.2
enter_coef_1 = 1.07
enter_coef_2 = 0.07
fixed_size = 1
donchian_window = 30
trailing_long = 0.4
trailing_short = 0.4
multiplier = 3
buy_break = 0 # 突破买入价
sell_setup = 0 # 观察卖出价
sell_enter = 0 # 反转卖出价
buy_enter = 0 # 反转买入价
buy_setup = 0 # 观察买入价
sell_break = 0 # 突破卖出价
intra_trade_high = 0
intra_trade_low = 0
day_high = 0
day_open = 0
day_close = 0
day_low = 0
tend_high = 0
tend_low = 0
exit_time = time(hour=14, minute=55)
parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting )
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
self.bars = []
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if last_bar.datetime.date() != bar.datetime.date():
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
想了解更多关于CTA策略开发实战的各种细节?请戳课程上线:《vn.py全实战进阶》!目前课程已经更新过半,一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。
了解更多知识,请关注vn.py社区公众号。
要获取Tick数据,并插入到vn.py数据库中,整体上有3种方法:
那么本文我们就选择第3种方法,通过读取CSV文件,把数据载入到数据库中。
首先需要保证你已经在系统上安装配置好了数据库,这里演示用的是MongoDB数据库以及图形化客户端Robo 3T。
注意在MongoDB中需要创建新数据库“vnpy”,然后在全局配置对话框中,修改相关配置:
"database.driver": "mongodb",
"database.database": "vnpy",
"database.host": "localhost",
"database.port": 27017,
"database.user": "",
"database.password": "",
"database.authentication_source": ""
注意输入上述内容到配置对话框中时,请忽略引号。修改完毕保存后,请重新启动VN Trader,检查相关配置是否已经修改成功。
然后我们把所有的CSV文件放在同一文件夹下,这样就可以使用一个脚本来读取该文件夹内的所有CSV格式文件,并批量载入到数据库中。
在开始处理数据之前,我们需要知道CSV文件中的表头信息和数据特征。用Excel打开其中任意一个CSV文件,查看其中的内容后,建立一个比较直观的印象,大概知道:
这里我们的CSV文件,表头以及第一行内容如下:
交易日,合约代码,交易所代码,合约在交易所的代码,最新价,上次结算价,昨收盘,昨持仓量,今开盘,最高价,最低价,数量,成交金额,持仓量,今收盘,本次结算价,涨停板价,跌停板价,昨虚实度,今虚实度,最后修改时间,最后修改毫秒,申买价一,申买量一,申卖价一,申卖量一,申买价二,申买量二,申卖价二,申卖量二,申买价三,申买量三,申卖价三,申卖量三,申买价四,申买量四,申卖价四,申卖量四,申买价五,申买量五,申卖价五,申卖量五,当日均价,业务日期
20190102,ru1905,,,11280.0000,11290.0000,11305.0000,322472,11280.0000,11280.0000,11280.0000,246,27748800.0000,322468,0.0000,0.0000,12080.0000,10495.0000,0,0,08:59:00,500,11280.0000,10,11290.0000,10,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,112800.0000,20190102
从以上内容中,我们发现下述特征:
有了这样的需求后,我们在接下来开发脚本的过程中就有了方向:
1)使用for循环遍历同一文件夹内所有CSV格式的文件(即以“.csv"结尾的文件名),使用csv_load函数来载入数据:
import os
import csv
from datetime import datetime, time
from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager
from vnpy.trader.object import TickData
def run_load_csv():
"""
遍历同一文件夹内所有csv文件,并且载入到数据库中
"""
for file in os.listdir("."):
if not file.endswith(".csv"):
continue
print("载入文件:", file)
csv_load(file)
2)csv_load函数的具体设计
def csv_load(file):
"""
读取csv文件内容,并写入到数据库中
"""
with open(file, "r") as f:
reader = csv.DictReader(f)
ticks = []
start = None
count = 0
for item in reader:
# generate datetime
date = item["交易日"]
second = item["最后修改时间"]
millisecond = item["最后修改毫秒"]
standard_time = date + " " + second + "." + millisecond
dt = datetime.strptime(standard_time, "%Y%m%d %H:%M:%S.%f")
# filter
if dt.time() > time(15, 1) and dt.time() < time(20, 59):
continue
tick = TickData(
symbol="RU88",
datetime=dt,
exchange=Exchange.SHFE,
last_price=float(item["最新价"]),
volume=float(item["持仓量"]),
bid_price_1=float(item["申买价一"]),
bid_volume_1=float(item["申买量一"]),
ask_price_1=float(item["申卖价一"]),
ask_volume_1=float(item["申卖量一"]),
gateway_name="DB",
)
ticks.append(tick)
# do some statistics
count += 1
if not start:
start = tick.datetime
end = tick.datetime
database_manager.save_tick_data(ticks)
print("插入数据", start, "-", end, "总数量:", count)
if __name__ == "__main__":
run_load_csv()
创建好脚本后可以直接运行:进入cmd或者Powershell,运行命令python load_tickdata.py即可,效果如下图所示:
此时我们使用Robo 3T客户端来连接上MongoDB,在数据库【vnpy】->【db_tick_data】可以看到新载入的数据:
CTA策略模块(CtaStrategy)的回测引擎BacktestingEngine支持Tick数据的回测,以下代码推荐在Jupyter Notebook中运行。
第一步我们需要在策略文件中进行一些修改,这里以AtrRsiStrategy策略为例:找到on_init函数,把其中的load_bar(10)
改为load_tick(10)
,即指定加载过去10天的Tick数据来执行策略初始化任务,而不是加载K线Bar数据进行初始化。
然后在加载回测相关的模块时,需要额外加载BacktestingMode枚举类型,其中包含有回测引擎所支持的Bar(K线)和Tick两种模式:
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from datetime import datetime
from atr_rsi_strategy import AtrRsiStrategy
创建回测引擎对象的实例后,在调用set_parameters函数时,参数中需要新增“mode=BacktestingMode.TICK ”,来指定回测引擎使用Tick回测模式。
同时需要注意另外2点:
engine = BacktestingEngine()
engine.set_parameters(
vt_symbol="RU88.SHFE",
interval="1m",
start=datetime(2019, 1, 1),
end=datetime(2019, 4, 1),
rate=0.5/10000,
slippage=5,
size=10,
pricetick=5,
capital=1_000_000,
mode=BacktestingMode.TICK
)
engine.add_strategy(AtrRsiStrategy, {})
后续的操作和K线模式回测就几乎完全相同了,加载历史数据并执行数据回放,然后基于逐笔成交计算每日盈亏数据,并生成最终的策略统计结果以及回测图表:
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()
由于这3个月行情多为区间震荡,所以以去趋势跟踪为核心逻辑的AtrRsiStrategy的回测效果不太理想:
下面我们如果想再看看详细的逐笔成交记录,可以遍历回测引擎中保存所有成交数据的trades字典,并打印每笔成交相关的字段信息:
trades = engine.trades
for value in trades.values():
print("时间:",value.datetime,value.direction.value,value.offset.value, "价格:",value.price, "数量:",value.volume)
if value.offset.value == "平":
print("---------------------------------------------------------")
这样就能看到每笔成交具体发生的时间点,并和Tick数据当时的盘口情况进行相应的比对检查了:
《vn.py全实战进阶》课程已经更新过半!一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程,详细内容请戳课程上线:《vn.py全实战进阶》!
了解更多知识,请关注vn.py社区公众号。
可以这类东西并没有统计显著。
你可以基于talib库来实现,如三只乌鸦等,返回的是0-100的数值