策略已经写好了,下一步就是历史回测:把历史上的价格数据(K线或者Tick),推送给策略去运行交易逻辑,并把策略产生的交易记录下来,最后分析这些回测的交易记录,从而来判断该策略的潜在盈利能力。
在开始之前,先来讲几个量化策略研究中(不管是否用vn.py),需要记住的几条重要原则:
要跑历史数据回测,第一步自然就是要先准备好历史数据。这里我们以国内期货数据为例,使用米筐的RQData来下载获取。
RQData目前提供30天的免费试用权限,网站申请非常方便。前往RQData主页
找到上图中的“免费试用”按钮,点击进去后:
根据自己的实际情况填写相应的注册信息,邀请码点击那个白色小问号图标,可以看到没有邀请码情况下的默认输入值(当前是ClOR,注意l是小写的L,而不是大写的i),点击“登录并申请”按钮后,会看到登录框:
选择“验证码登录”,输入手机号验证码后点击“确认登录”按钮,回到上一步的界面,但注意此时底部按钮显示的文字已变为“立即申请试用”:
点击上述按钮后完成试用申请,注意此时有可能出现验证码超时或者其他的错误信息,根据提示重新填写再点击按钮即可。申请成功后会自动弹出开始下载[make.bat]文件(该文件中即包含了申请的试用账号和密码),以及[RQDATA使用说明.pdf]。
使用VS Code打开make.bat文件:
记住其中的name(用户名)以及password(密码),注意密码是一串长达344个字符的密钥,上图中仅截取了很短一部分(别想偷懒,哈哈)。
然后运行VN Station,点击VN Trader Pro,在右侧的上层应用中加载CtaBacktester(CTA回测模块)后启动,在主界面顶部的菜单栏,找到“配置”按钮:
点击后打开VN Trader的全局配置对话框:
将之前已经准备好的RQData用户名和密码,分别填入到rqdata.username和rqdata.password两个字段中,然后点击“确定”按钮,弹出提示重启的对话框。
此时即可关闭VN Trader并重启,点击菜单栏“功能”->“CTA回测”,启动接下来我们要用到的CTA策略回测图形界面:
如果上一步的RQData账号密码配置正确,此时可以在中间底部的日志输出框中看到“RQData数据接口初始化成功”的信息。如果没有就说明配置有问题,回去重来吧。
窗口左上方的一系列编辑框和下拉框,用来控制和管理我们的回测功能。在本地代码编辑框中输入IF88.CFFEX,K线周期选择1m(即1分钟K线),然后选择要下载数据的开始日期和结束日期,点击“下载数据”按钮。
此时CtaBacktester模块就会自动从RQData服务器下载历史数据,并完成数据结构转化后插入到VN Trader的数据库中(默认使用SQLite,数据文件位于.vntrader目录下的database.db),下载完成后同样会在日志输出框中看到相应信息:
有了历史数据后,我们就可以开始跑历史回测。在左上角的交易策略下拉框里,应该已经能找到上一篇教程我们编写的DemoStrategy,选中后开始配置回测参数。
注意这里我们使用的是中金所股指期货的IF合约,回测时的参数要设置为股指期货所对应的属性。手续费率编辑框中输入0.000025(万0.25),交易滑点输入0.2(即单边成交1跳的滑点成本),合约乘数为300(300元每点),价格跳动也是0.2(股指期货最小价格变动),回测资金我们使用100万。
点击“开始回测”按钮,弹出参数配置对话框:
这里显示的fast_window和slow_window就是之前我们添加到parameters列表中的参数名称,这里我们直接使用默认数值,点击“确定”按钮后,我们的回测引擎就会自动开始执行策略回测的整个流程:加载数据、数据回放、模拟撮合、计算每日盈亏、统计指标、以及最后画出图表:
不出之前的意料,双均线策略的效果差的一塌糊涂,右侧图表的4个子图中:
然后在中间顶部的表格中,可以看到回测相关的一些统计数据:
这策略干了什么事情能亏这么多钱呢,总有很多人会抱着不信邪的态度,此时点击左侧的“成交记录”按钮,可以看到回测过程中的每一笔成交记录:
还不信邪,可以点击“委托记录”按钮查看这些成交具体是由哪些委托触发的:
可以通过“成交记录”中的每条成交对应的委托号,在“委托记录”中找到策略下出的委托。细心的人可能已经发现上面两张图中,某一笔委托的价格和其对应成交的价格并不一致,这是因为我们在策略下单时使用了超价5元(为了保证成交),而回测仿真撮合时则是取了T+1时刻的最优成交价(也是实盘中最可能拿到的价格)。
在“每日盈亏”窗口中,可以看到以逐日盯市规则(期货结算规则),将每日的持仓和成交映射到当日收盘价后的当日整体盈亏情况:
最后,如果还是觉得死活不相信双均线策略怎么可能这么差,点击“K线图表按钮”,可以看到整个回测数据对应的K线图表:
上图中的黄色向上箭头代表多头成交(buy/cover),蓝色向下箭头则代表了空头成交(sell/short),可以通过键盘和鼠标拖动和缩放图表,看到自己想要的部分。
到了这里,是不是已经有点相信了“双均线策略就是垃圾”的说法?实际上从公平角度讲,以上看到的回测信息,并不能充分证明双均线信号的无效性,如果我们:
出来的结果则变成了:
从这张图上看,在不考虑交易成本时,双均线的来回穿插作为一种信号,可能还是有一定的预测效果(尽管也好不到哪里去)。
实盘用可能是没希望了,但不妨碍我们想来折腾一下,看看在股指1分钟数据上到底怎样的均线组合能起到最好的效果,毕竟之前的3和80两个参数纯粹只是拍脑袋的结果。
假设我们想要看看fast_window,从2到20(步进2),slow_window,从20到100(步进10),参数分别两两组合出来的回测效果,看看能不能找到更好的均线组合。
比较傻的方法就是人工操作,每次将两个参数输入到回测的参数对话框里,然后运行等结果,再把结果记录在Excel表格里最后用来做排序比较。但对于这种机械重复的劳动,电脑比起人的效率要高得多得多,在本质上我们就是分别遍历两个参数的各种可能排列组合,然后针对每组组合,跑完回测并记录其中的关键结果,也就是所谓的“参数优化”。
CtaBacktester模块已经内置了策略参数优化的功能,点击左侧下方的“参数优化”按钮:
在弹出的对话框中,我们把之前的参数想法输入进去:
点击“多进程优化”,使用暴力穷举算法(Brute-Force Algorithm),同时运行多个并行的Python进程,充分利用CPU的核心数量来加快优化速度。
优化完成后,日志信息中会有相应的提示,同时左下角的“优化结果”按钮会亮起:
点击后看到每组参数组合,所对应的目标函数结果:
效果最好的是fast_window为18,slow_window为90,带入到策略回测中运行后:
参数优化大法好!!!
可能在前面铺垫了那么多的情况下(过度拟合风险、双均线信号普通、移除了滑点手续费),你不见得还会脑子里蹦出这么一句话,但不可否认参数优化后的效果提升非常明显。
在本篇教程的最后,希望提醒大家的是:尽管看起来一路点点鼠标就能搞出个漂亮的资金曲线了,但实际上量化策略回测和优化过程中充满了各种各样的地雷。
到目前为止我们所讲述的只是最最基础的操作方法,还远没有涉及到实践经验的内容,这块要么大家用自己的真金白银在交易中慢慢积累,另一个成本更低的选择当然就是关注我们后续的进阶教程了!
了解更多知识,请关注vn.py社区公众号。
快速入门系列已经到了第6篇,终于要开始接触编程实操的内容了。这篇教程中的内容,假设你对于Python语言的开发已经有了一定的基础掌握:
如果没有也没关系,推荐一本快速入门的书《笨方法学Python3》(Learn Python the Hard Way),也是我自己多年前从完全0基础的小白第一次学习Python的敲门砖,50多堂课从头到尾敲一遍,只要不偷懒包敲包会,不会来找我。
用电脑写文章通常需要Word,同样写程序代码也需要专用的工具,这种工具就叫做IDE,全称Integrated Development Environment,中文名叫做“集成开发环境”。
IDE提供了一整套包括代码编写、调试运行、版本控制、管理界面等等写程序时所必须的工具环境,接下来教程里我们选择使用的是Visual Studio Code(简称VS Code),由微软推出的编程专用开源编辑器(然后被热心的开源社区加上各种插件打造成了超级IDE)。
前往VSCode首页,点击Download绿色图标下载后,一路傻瓜安装,运行后看到下述界面:
在左侧导航栏顶部的5个按钮中,点击最下方的按钮进入安装扩展插件的页面,在搜索框中输入Python回车,找到由微软官方推出的Python插件:
点击上图红框中的绿色安装按钮,会自动在后台完成插件的下载安装启动,至此就已经准备好了我们的IDE。
尽管这里我们推荐使用VS Code,但如果你已经有了常用的IDE工具,不管是PyCharm、WingIDE、Vim还是Visual Studio,都可以用来完成后续的策略代码开发工作,所以直接用就好。假设过程中遇到了某些难以解决的问题,可以再换回到VS Code,本系列教程中的操作保证都完全可用。
首先要接触的是一个用户目录的概念,即任何操作系统默认用来存放当前登录的用户运行程序时缓存文件的目录,假设你的登录用户名为client,那么:
上述即为最常用的用户目录路径,注意只是常用情况,如果你的系统做了特殊配置修改可能不同。
VN Trader默认的运行时目录即为操作系统用户目录,启动后会在用户目录下创建.vntrader文件夹用于保存配置和临时文件(有时遇到奇怪的bug,删除该文件夹后重启就能解决)。
同时CTA策略模块(CtaStrategyApp)在启动后,也会扫描位于VN Trader运行时目录下的strategies文件夹来加载用户自定义的策略文件,以Windows为例:C:\Users\client\strategies。注意strategies文件夹默认是不存在的,需要用户自行创建。
进入strategies目录后,新建我们的第一个策略文件:demo_strategy.py,然后用VS Code打开。
新建的策略文件打开后,内部空空如也,此时我们开始往其中添加代码,遵循行业传统,这里也同样选择用傻瓜的双均线策略作为演示:
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
class DemoStrategy(CtaTemplate):
"""演示用的简单双均线"""
# 策略作者
author = "Smart Trader"
# 定义参数
fast_window = 10
slow_window = 20
# 定义变量
fast_ma0 = 0.0
fast_ma1 = 0.0
slow_ma0 = 0.0
slow_ma1 = 0.0
# 添加参数和变量名到对应的列表
parameters = ["fast_window", "slow_window"]
variables = ["fast_ma0", "fast_ma1", "slow_ma0", "slow_ma1"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
# K线合成器:从Tick合成分钟K线用
self.bg = BarGenerator(self.on_bar)
# 时间序列容器:计算技术指标用
self.am = ArrayManager()
def on_init(self):
"""
当策略被初始化时调用该函数。
"""
# 输出个日志信息,下同
self.write_log("策略初始化")
# 加载10天的历史数据用于初始化回放
self.load_bar(10)
def on_start(self):
"""
当策略被启动时调用该函数。
"""
self.write_log("策略启动")
# 通知图形界面更新(策略最新状态)
# 不调用该函数则界面不会变化
self.put_event()
def on_stop(self):
"""
当策略被停止时调用该函数。
"""
self.write_log("策略停止")
self.put_event()
def on_tick(self, tick: TickData):
"""
通过该函数收到Tick推送。
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
通过该函数收到新的1分钟K线推送。
"""
am = self.am
# 更新K线到时间序列容器中
am.update_bar(bar)
# 若缓存的K线数量尚不够计算技术指标,则直接返回
if not am.inited:
return
# 计算快速均线
fast_ma = am.sma(self.fast_window, array=True)
self.fast_ma0 = fast_ma[-1] # T时刻数值
self.fast_ma1 = fast_ma[-2] # T-1时刻数值
# 计算慢速均线
slow_ma = am.sma(self.slow_window, array=True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
# 判断是否金叉
cross_over = (self.fast_ma0 > self.slow_ma0 and
self.fast_ma1 < self.slow_ma1)
# 判断是否死叉
cross_below = (self.fast_ma0 < self.slow_ma0 and
self.fast_ma1 > self.slow_ma1)
# 如果发生了金叉
if cross_over:
# 为了保证成交,在K线收盘价上加5发出限价单
price = bar.close_price + 5
# 当前无仓位,则直接开多
if self.pos == 0:
self.buy(price, 1)
# 当前持有空头仓位,则先平空,再开多
elif self.pos < 0:
self.cover(price, 1)
self.buy(price, 1)
# 如果发生了死叉
elif cross_below:
price = bar.close_price - 5
# 当前无仓位,则直接开空
if self.pos == 0:
self.short(price, 1)
# 当前持有空头仓位,则先平多,再开空
elif self.pos > 0:
self.sell(price, 1)
self.short(price, 1)
self.put_event()
def on_order(self, order: OrderData):
"""
通过该函数收到委托状态更新推送。
"""
pass
def on_trade(self, trade: TradeData):
"""
通过该函数收到成交推送。
"""
# 成交后策略逻辑仓位发生变化,需要通知界面更新。
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
通过该函数收到本地停止单推送。
"""
pass
在文件头部我们的一系列import中,最重要的就是CtaTemplate,这是我们开发CTA策略所用的策略模板基类,策略模板提供了一系列以on_开头的回调函数,用于接受事件推送,以及其他主动函数用于执行操作(委托、撤单、记录日志等)。
所有开发的策略类,都必须继承CtaTemplate基类,然后在需要的回调函数中实现策略逻辑,即当某件事情发生时我们需要执行的对应操作:比如当收到1分钟K线推送时,我们需要计算均线指标,然后判断是否要执行交易。
所有的量化交易策略必然都会涉及到参数和变量这两个和数值相关的概念。
参数是策略内部的逻辑算法中用来控制结果输出的一些数值,在策略类中需要定义出这些参数的默认数值:
# 定义参数
fast_window = 10
slow_window = 20
定义完后,还需要参数的名称(字符串)添加到parameters列表中:
parameters = ["fast_window", "slow_window"]
这一步操作是为了让系统内的策略引擎,得以知道该策略有哪些参数,并在初始化策略时弹出相应的对话框让用户填写,或者在命令行模式下直接将配置字典中对应的key的value赋值到策略变量上。
变量则是策略内部的逻辑算法在执行过程中用来缓存中间状态的一些数值,在策略类中同样需要定义出这些变量的默认数值:
# 定义变量
fast_ma0 = 0.0
fast_ma1 = 0.0
slow_ma0 = 0.0
slow_ma1 = 0.0
定义完后,还需要变量的名称(字符串)添加到variables列表中:
variables = ["fast_ma0", "fast_ma1", "slow_ma0", "slow_ma1"]
和参数类似,这一步操作是为了让系统内的策略引擎,得以知道该策略有哪些变量,并在GUI图形界面上更新策略状态时将这些变量的最新数值显示出来,同时在保存策略运行状态到缓存文件中时将这些变量写入进去(实盘中每天关闭策略时会自动缓存)。
需要注意的是:
上面已经提到,在vn.py的CTA策略模块中,所有的策略逻辑都是由事件来驱动的。对于事件,举例来说:
对于最简单的双均线策略的DemoStrategy来说,我们不用关注委托状态变化和成交推送之类的细节,只需要在收到K线推送时(on_bar函数中)执行交易相关的逻辑判断即可。
每次新的一根K线走完时,策略会通过on_bar函数收到该根K线的数据推送。注意此时收到的数据只有该K线,但大部分技术指标计算时都需要过去N个周期的历史数据。
所以为了计算均线技术指标,我们需要使用一个叫做时间序列容器ArrayManager的对象,用于实现了K线历史的缓存和技术指标计算,该对象的创建,在策略的init函数中:
# 时间序列容器:计算技术指标用
self.am = ArrayManager()
在on_bar函数的逻辑中,第一步需要将K线对象推送到该时间序列容器中:
# 纯粹为了后续可以少写一些self.
am = self.am
# 更新K线到时间序列容器中
am.update_bar(bar)
# 若缓存的K线数量尚不够计算技术指标,则直接返回
if not am.inited:
return
为了满足技术指标计算的需求,我们通常需要最少N根K线的缓存(N默认为100),在推送进ArrayManager对象的数据不足N之前,是无法计算出需要的技术指标的,对于缓存数据是否已经足够的判断,通过am.inited变量可以很方便的判断,在inited变为True之前,都应该只是缓存数据而不进行任何其他操作。
当缓存的数据量满足需求后,我们可以很方便的通过am.sma函数来计算均线指标的数值:
# 计算快速均线
fast_ma = am.sma(self.fast_window, array=True)
self.fast_ma0 = fast_ma[-1] # T时刻数值
self.fast_ma1 = fast_ma[-2] # T-1时刻数值
# 计算慢速均线
slow_ma = am.sma(self.slow_window, array=True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
注意这里我们传入了可选参数array=True,因此返回的fast_ma为最新移动平均线的数组,其中最新一个周期(T时刻)的移动均线ma数值可以通过-1下标获取,上一个周期(T-1时刻)的ma数值可以通过-2下标获取。
有了快慢两根均线在T时刻和T-1时刻的数值后,我们就可以进行双均线策略的核心逻辑判断,即是否发生了均线金叉或者死叉:
# 判断是否金叉
cross_over = (self.fast_ma0 > self.slow_ma0 and
self.fast_ma1 < self.slow_ma1)
# 判断是否死叉
cross_below = (self.fast_ma0 < self.slow_ma0 and
self.fast_ma1 > self.slow_ma1)
所谓的均线金叉,是指T-1时刻的快速均线fast_ma1低于慢速均线slow_ma1,而T时刻时快速均线fast_ma0大于或等于慢速均线slow_ma10,实现了上穿的行为(即金叉)。均线死叉则是相反的情形。
当金叉或者死叉发生后,则需要执行相应的交易操作:
# 如果发生了金叉
if cross_over:
# 为了保证成交,在K线收盘价上加5发出限价单
price = bar.close_price + 5
# 当前无仓位,则直接开多
if self.pos == 0:
self.buy(price, 1)
# 当前持有空头仓位,则先平空,再开多
elif self.pos < 0:
self.cover(price, 1)
self.buy(price, 1)
# 如果发生了死叉
elif cross_below:
price = bar.close_price - 5
# 当前无仓位,则直接开空
if self.pos == 0:
self.short(price, 1)
# 当前持有空头仓位,则先平多,再开空
elif self.pos > 0:
self.sell(price, 1)
self.short(price, 1)
对于简单双均线策略来说,用于处于持仓的状态中,金叉后拿多仓,死叉后拿空仓。
所以当金叉发生时,我们需要检查当前持仓的情况。如果没有持仓(self.pos == 0),说明此时策略刚开始交易,则应该直接执行多头开仓操作(buy)。如果此时已经持有空头仓位(self.pos , 0),则应该先执行空头平仓操作(cover)然后同时立即执行多头开仓操作(buy)。为了保证成交(简化策略),我们在下单时选择了加价的方式来实现(多头+5,空头-5)。
注意尽管这里我们选择使用双均线策略来做演示,但在实践经验中简单均线类的策略效果往往非常差,千万不要拿来跑实盘,也不建议在此基础上进行扩展开发,毕竟在豆腐渣工程上建什么都还是豆腐渣......
DemoStrategy的双均线交易逻辑,可以通过超价买卖的方式来保证成交,从而忽略撤单、委托更新、成交推送之类更加细节的事件驱动逻辑。
但在实盘交易时,任何交易系统都只会推送最新的Tick更新数据,而不会有完整的K线推送,因此用户需要自行在本地完成Tick到K线的合成逻辑。
这块vn.py也提供了完善的K线合成工具BarGenerator,用户只需在策略的init函数中创建实例:
# K线合成器:从Tick合成分钟K线用
self.bg = BarGenerator(self.on_bar)
其中BarGenerator对象创建时,传入的参数(self.on_bar)是当1分钟K线走完时触发的回调函数。
在实盘策略收到最新的Tick推送时,我们只需要将TickData更新到BarGenerator中:
def on_tick(self, tick: TickData):
"""
通过该函数收到Tick推送。
"""
self.bg.update_tick(tick)
当BarGenerator发现某根K线走完时,会将过去1分钟内的Tick数据合成的1分钟K线推送给策略,自动调用策略的on_bar函数,执行在上一节中讲解的交易逻辑。
了解更多知识,请关注vn.py社区公众号。
由于一些历史原因,A股证券市场在几年前就关闭了类似期货这样的开放式API接入(几乎0门槛,只需要一定开发技术),现阶段可行的方案主要是“三方采购”的接入模式,其门槛大致为:
监管的核心目标(或者说红线)是杜绝配资的可能。
中泰证券自主开发的XTP量化极速证券柜台,无论速度性能还是每日交易量,都是目前国内领先的证券柜台之一,同时其“三方采购”流程比较标准化,本篇教程就主要针对XTP进行讲解。
XTP目前提供三类账户:
测试上我们就用简单的现货账户,前往中泰证券XTP的主页,点击右上角的“注册”按钮申请测试账号:
注册完成后,回到主页点击右上角的“登录”按钮进行登录,完成后再点击右上角的“测试账号”,进入申请页面:
这里请根据实际情况进行填写,提交申请后一个工作日,将会收到测试环境的账号、密码、服务器等信息的邮件。
接下来照着入门1中的方法,启动VN Trader Pro,只加载“XTP”接口。
进入主界面后,点击菜单栏的“系统”->“连接XTP”,看到对话框:
各个字段的填写如下:
XTP的测试环境部署在阿里云上,可以直接通过互联网连接,无需把机器放到托管环境内。
以上都填好后,点击“连接”按钮开始登录XTP服务器以及相关初始化操作,在右下角的日志监控组件中,可以看到初始化相关的日志信息输出:
登录初始化过程中有任何异常情况,日志信息中都会看到相应的文字输出,可以根据内容自行排查。看到两个交易所的合约信息查询成功这条日志后,说明已经成功完成了初始化操作。
点击菜单栏“帮助”->“查询合约”,或者左侧功能导航栏的倒数第二个放大镜按钮,打开合约查询对话框:
点击右上角的查询按钮,显示当前VN Trader内部已连接的交易接口(XTP)的上支持的所有可交易合约。
几个需要关注的字段:
在上一步中找到自己想要订阅行情的合约信息后,则可以在VN Trader界面左上角的交易组件框中,选择交易所、接口后,在代码框中输入合约代码后回车,即可订阅行情。
当收到最新行情Tick推送时,会显示在下方的深度报价中,国内证券市场Level 1行情的Tick推送的更新频率是每3秒1笔。
所有已订阅的行情信息,都会显示在右侧顶部的行情监控组件中,方便后续快速执行手动交易:
知道最新行情的价格在哪里后,就可以进行买卖下单:
委托请求提交后,则会返回相应的委托回报信息,显示在委托组件中,显示当前这笔委托请求的最新状态:
注意委托组件分为两个:
两个组件中,对于处于可撤状态的委托,均可双击该笔委托的单元格来实现撤单的功能(鼠标放置其上时会有文字提示)。或者也可以通过交易组件上的单击“全撤”按钮,来实现一键全撤VN Trader内当前所有可撤委托。
当委托发生成交后,VN Trader会收到成交推送的数据,并显示在成交监控组件中,用户可以通过每笔成交的委托号来实现和对应委托的映射。注意在实盘中,每笔委托可能和多笔反向来自其他投资者的委托发生成交,即一笔委托对应有多笔成交记录。
委托成交后,XTP账户的资金情况将会发生变化:
VN Trader底部中间的资金监控组件的数据,默认以每4秒一次的频率查询刷新,所以某一时间点你看到数据可能并非最新情况。
当你已经对XTP的仿真测试环境足够熟悉后,可能已经做好了使用XTP柜台进行实盘交易的准备,接下来的步骤是:
了解更多知识,请关注vn.py社区公众号
IB,全称Interactive Brokers,中文名盈透证券,是全球公认第一的全品种零售经纪商,提供的金融品种包括:
加上已有十多年历史早已成为行业标准的交易接口IB API,基本上说起外盘交易通道,IB就是当仁不让的第一选择了。
但需要额外强调的是,针对某些特定的交易市场或者需求,IB未必是最终的选择,但一定是比较方便的。
和其他接口不同的是,IB一共有三套账户体系:
其中Paper Trading和Live Trading高度类似,提供接近实盘交易行情以及撮合成交(像CTP的第一套环境),而Demo Trading则只提供历史行情的回放,满足交易接口测试的需求(像CTP的第二套7X24小时环境)。
接下来的内容,我们以Demo Trading为例来讲解,首先前往IB官网下载TWS(Trader Work Station),一般下载最新版本的TWS Latest。
安装后启动,看到以下界面:
点击底部的“Return to the demo”按钮,打开演示账户测试界面:
输入邮箱地址后,点击“Try Demo”按钮,即可登录进入演示账户的TWS界面:
点击右上角的齿轮图标(从右往左导数第五个),打开TWS配置对话框,在General页面,将“Current Language”设为English后,关闭并重启TWS。
然后进入到“API”->“Settings”来进行API接入相关的配置:
和其他大多数API接口直连经纪商服务器的模式不同,IB接口连接的是位于本地的交易客户端TWS,再由TWS负责连接到IB的服务器,因此在启动VN Trader前需要先启动TWS完成账户登录。
接下来照着入门1中的方法,启动VN Trader Pro,只加载“盈透证券”接口。
进入主界面后,点击菜单栏的“系统”->“连接IB”,看到对话框:
各个字段的填写如下:
需要强调的是,和CTP等其他API从本地交易程序直连远端服务器的模式不同,IB API的连接是从本地交易程序(如VN Trader),到本地的客户端TWS,背后由TWS负责维护从本地到IB服务器的连接。
以上都填好后,点击“连接”按钮开始登录CTP服务器以及相关初始化操作,在右下角的日志监控组件中,可以看到初始化相关的日志信息输出:
登录初始化过程中有任何异常情况,日志信息中都会看到相应的文字输出,可以根据内容自行排查。看到“服务器时间”这条日志后,说明已经成功完成了初始化操作。
后续依旧会有一系列的信息通知,这些主要和IB账户相关的数据权限配置有关(说白了就是告诉你那些实时行情你没花钱买,现在获取不到),Demo Trading没有特别好的解决办法,推荐还是入金后申请Paper Trading。
由于盈透证券支持的交易所以及合约资源实在太多,IB API无法提供和CTP接口类似的全合约查询功能。用户需要前往盈透证券的官方网站,在其交易产品列表页面,自行寻找相关的合约信息
注意在VN Trader中,针对IB接口所使用的交易合约代码,是该合约在IB系统内的唯一标识符Conid,全称Contract Identifier,注意Conid为一串纯数字,不要和Symbol或者Description Name搞混。
在上一步中找到自己想要订阅行情的合约信息后,则可以在VN Trader界面左上角的交易组件框中,选择交易所、接口后,在代码框中输入合约代码后回车,即可订阅行情。
当收到最新行情Tick推送时,会显示在下方的深度报价中,IB接口的Tick推送的最高更新频率是每秒4笔。
订阅行情后,IB接口也会自动查询该合约的相关信息,此时再打开“帮助”->“合约查询”组件,即可看到该合约:
所有已订阅的行情信息,都会显示在右侧顶部的行情监控组件中,方便后续快速执行手动交易:
需要注意IB上的行情数据,除少量免费提供外(外汇、贵金属),其他大部分都需要在IB官网的后台管理系统中付费购买后,才能在VN Trader中订阅使用。如果行情订阅失败(Conid填错、没有购买等),在左下角的日志组件中也会有相应的内容输出,方便排查。
知道最新行情的价格在哪里后,就可以进行买卖下单:
委托请求提交后,则会返回相应的委托回报信息,显示在委托组件中,显示当前这笔委托请求的最新状态:
注意委托组件分为两个:
两个组件中,对于处于可撤状态的委托,均可双击该笔委托的单元格来实现撤单的功能(鼠标放置其上时会有文字提示)。或者也可以通过交易组件上的单击“全撤”按钮,来实现一键全撤VN Trader内当前所有可撤委托。
当委托发生成交后,VN Trader会收到成交推送的数据,并显示在成交监控组件中,用户可以通过每笔成交的委托号来实现和对应委托的映射。注意在实盘中,每笔委托可能和多笔反向来自其他投资者的委托发生成交,即一笔委托对应有多笔成交记录。
委托成交后,账户的资金情况将会发生变化,IB针对每个币种的资金提供独立的账号,可以通过账号名后缀来判断,注意其中的BASE是你的IB账户的基础币种账号。
IB接口提供实时推送更新的资金和持仓数据,在持仓方向上也采取净仓模式来计算:
当你已经对IB的Demo Trading测试环境足够熟悉后,可以开始用Paper Trading来进行仿真交易,或者使用Live Trading进行实盘交易,只需要在TWS登录的时候选择对应的账号即可。
除了TWS外,IB针对API交易还提供了一套更加轻量级的客户端IB Gateway,只有简单的GUI界面用于显示日志,在资源占用和延时性能方面更有优势。
了解更多知识,请关注vn.py社区公众号
装好了运行环境,下一步就可以直接上手开始交易了。
目前vn.py已经支持了市面上几乎所有金融产品的交易:期货、股票、期权、外汇。针对每种产品,可能又有多个不同的交易接口可以选择,有的门槛低,有的速度快,有的功能强大。
考虑到这个系列是入门教程,接下来的三篇将会选择常见的接口来讲解如何使用vn.py:
上期技术官方运营了一套期货仿真交易环境SimNow,提供和实盘环境一致的行情以及交易撮合规则,现在已经是做各种CTP测试交易的首选了。
打开SimNow官网,点击右上角的“注册账号”,填写一些基础信息完成注册。整体流程十分傻瓜,但有隐藏的两个坑需要注意:
如果不幸遇到以上两点以外的坑,请尝试通过下述电话或者QQ联系客服解决:
注册完成后,回到SimNow首页点击点右上角的“投资者登录”,输入手机号和密码登录进去:
请牢记上图中的investorId,这才是你的SimNow环境的CTP用户名,而不是登录网站用的手机号!同时CTP密码则就是你登录网站用的密码。
接下来需要修改一次密码才能使用API进行交易(没错,刚注册就要改),修改有两种办法:
一种办法不行,就换另一种,两种都不行就联系客服小姐姐吧。
尽管写下来字数还挺多的,实际操作可能也就5分钟的事情,全部弄完后就可以准备开始交易了。
接下来照着入门1中的方法,启动VN Trader Pro,只加载CTP接口就行(注意不要加载CTP测试接口)。
进入主界面后,点击菜单栏的“系统”->“连接CTP”,看到对话框:
各个字段的填写如下:
其中交易和行情服务器,一共有三组选择,前两组只能在交易时段登录(周一到周五,日盘和夜盘时段),提供和实盘环境一致的行情和撮合:
选择1(对应SimNow第一套第二组)
选择2(对应SimNow第一套第三组)
第三组则是只能在非交易时段登录,提供最近交易时段行情的回放和撮合:
**选择3(对应SimNow第二套)
其他介绍信息可以查看:http://www.simnow.com.cn/product.action
以上都填好后,点击“连接”按钮开始登录CTP服务器以及相关初始化操作,在右下角的日志监控组件中,可以看到初始化相关的日志信息输出:
登录初始化过程中有任何异常情况,日志信息中都会看到相应的文字输出,可以根据内容自行排查。看到“合约信息查询成功”这条日志后,说明已经成功完成了初始化操作。
点击菜单栏“帮助”->“查询合约”,或者左侧功能导航栏的导数第二个放大镜按钮,打开合约查询对话框:
点击右上角的查询按钮,显示当前VN Trader内部已连接的交易接口(CTP)的上支持的所有可交易合约。
几个需要关注的字段:
在上一步中找到自己想要订阅行情的合约信息后(或者你本来就知道),则可以在VN Trader界面左上角的交易组件框中,选择交易所、接口后,在代码框中输入合约代码后回车,即可订阅行情。
当收到最新行情Tick推送时,会显示在下方的深度报价中,Tick推送的最高更新频率是每秒2笔,如果没有变化变则可能1笔推送都没有。
注意国内期货普遍只提供1档买卖价,部分期货公司的上期所和能源交易所品种可以获取到5档买卖价。
注意每个交易所的合约命名规则有所区别:
如果订阅行情时,日志监控输出说找不到合约信息,那么请先检查是否搞对名命名规则。
所有已订阅的行情信息,都会显示在右侧顶部的行情监控组件中,方便后续快速执行手动交易。
知道最新行情的价格在哪里后,就可以进行买卖下单:
委托请求提交后,则会返回相应的委托回报信息,显示在委托组件中,显示当前这笔委托请求的最新状态:
注意委托组件分为两个:
两个组件中,对于处于可撤状态的委托,均可双击该笔委托的单元格来实现撤单的功能(鼠标放置其上时会有文字提示)。或者也可以通过交易组件上的单击“全撤”按钮,来实现一键全撤VN Trader内当前所有可撤委托。
当委托发生成交后,VN Trader会收到成交推送的数据,并显示在成交监控组件中,用户可以通过每笔成交的委托号来实现和对应委托的映射。注意在实盘中,每笔委托可能和多笔反向来自其他投资者的委托发生成交,即一笔委托对应有多笔成交记录。
委托成交后,CTP账户的资金情况将会发生变化,可用资金将会减少,同时整体余额将基于“逐日盯市”的规则变动。
VN Trader中底部中间的资金监控组件的数据,默认以每6秒一次的频率查询刷新,所以某一时间点你看到数据可能并非最新情况。
持仓信息同样也采用6秒刷新的频率,注意对于国内的期货市场,多头和空头的持仓情况分开计算。因此在某一合约上,如上图的rb1910,可能既有多头持仓(上图8手),也有空头持仓(上图3手),双向持仓均会存在各自的保证金占用。
同时由于上期所(包括能源交易中心)今昨仓分离的规则,平仓时需要分别发出对应的委托指令,如想要平掉上图中rb1910的8手多头持仓,则需要分别发出平昨7手的指令,加上平今1手的指令。而其他三家交易所则不受此影响,直接选择平仓指令8手即可。
当你已经对SimNow的仿真测试环境足够熟悉后,可能已经做好了使用CTP柜台进行实盘交易的准备。
对于CTP实盘交易:
具体的穿透式认证方法请参考这篇:看完这篇,彻底搞定期货穿透式CTP API接入。
了解更多知识,请关注vn.py社区公众号。
尽管之前在知乎和华尔街见闻上都推出过一些教程,但随着vn.py进入2.0时代,过去的内容可能已经有点落后于项目发展了,接下来将会通过vn.py官方服务号(vnpy-community)逐渐更新一套2.0上的快速入门教程。
运行vn.py,第一步需要准备Python环境。再也不用像1.0时代需要折腾半天安装Anaconda、三方模块、MongoDB数据库等等,2.0只有一个步骤安装由vn.py核心团队针对量化交易开发的Python发行版,VN Studio。
打开官网首页,正中央左边的金色按钮就是最新版本VN Studio的下载链接,写本文的时候最新版本是2.0.6,后续随着版本更新可能会变为2.0.7、2.0.8等等,总之认准金色按钮就行。
下载完成后双击运行,会看到一个很常见的软件安装界面,安装目录推荐选择默认的C:\vnstudio,后续我们的教程都会以此目录作为VN Studio的路径,当然也可以根据自己的需求安装到其他目录,然后一路“下一步”完成傻瓜式安装,Done!
安装完成后,回到桌面上就能看到VN Station的快捷方式(就是这个帅气的黑马头像),注意如果桌面背景偏暗可能看不清,请睁大眼睛仔细查看。
社区有人提了说能不能换个图标颜色解决下,无奈换来换去都不如黑色帅,只能暂时作罢(可能我们垃圾的美术和P图水平才是主要原因)。
双击启动后,将会看到VN Station的登录框。对于首次使用的用户,请点击微信登录后,扫描二维码注册账号,请牢记用户名和密码(同样也用于登录社区论坛,后续使用可以直接输入用户名和密码登录,勾上“保存”勾选框更加方便~)
登录后看到的就是VN Station主界面了,上方区域显示的是目前社区论坛最新的置顶精华主题(目前注册人数刚破4500,每日精华做不到,每周两三篇还是有的),下方的五个按钮则是VN Station提供的量化相关功能按钮:
由于VN Trader Lite是一键式启动无需配置,我们这里就只讲VN Trader Pro。
点击按钮后弹出的第一个对话框,是选择VN Trader运行时目录,这里默认是当前操作系统的用户目录(User Path),比如我这里就是C:\Users\Administrator。
在2.0中对Python源代码和运行时文件进行了分离,VN Trader运行过程中所有产生的配置文件、临时文件、数据文件(使用SQLite数据库),都会放置在运行时目录下的.vntrader文件夹中。
当VN Trader启动时,会检查当前目录是否存在.vntrader文件夹,若有就直接使用当前目录作为运行时目录,找不到则会使用默认的用户目录(并在其中创建.vntrader文件夹)。
大多数情况下,使用操作系统默认用户目录就是最便捷的方案,直接在上述窗口中直接点击右下角的“选择文件夹”按钮,开始配置VN Trader:
在左侧选择需要的底层交易接口,“介绍”一栏中可以看到每个接口所支持的交易品种。注意部分接口存在冲突不能同时使用,下方的说明信息中有写。
在右侧选择需要的上层应用模块,同样在“介绍”一栏中可以看到该模块所提供的具体功能。各个上层应用之间并不存在冲突的情况,所以新手不妨全部加载了一个个看看,后续确定自己的交易策略后再按需加载。
点击“启动”按钮后,稍等几秒就会看到上图所示的VN Trader主界面,下面就可以连接登录交易接口,开始执行交易了!
vn.py/VN Studio/VN Station/VN Trader,都是干啥的?
VN Studio支持哪些操作系统?
VN Studio目前仅提供Windows版本,尽管vn.py是全平台通用的(Windows/Linux/Mac),但Linux/Mac下的安装可以通过脚本一键完成(后续教程将会提供),所以暂时没有提供VN Studio的计划。
了解更多知识,请关注vn.py社区公众号
R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。
R-Breaker的策略逻辑由以下4部分构成:
1)计算6个目标价位
根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:
他们的计算方法如下:(其中a、b、c、d为策略参数)
2)设计委托逻辑
趋势策略情况:
反转策略情况:
3)设定相应的止盈止损。
4)日内策略要求收盘前平仓。
上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。
实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:
1)趋势策略:
2)反转策略:
其代码实现逻辑如下:
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
# Trend Condition
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。
1)趋势策略:
2)反转策略
综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:
由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。
由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。
为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。
R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。
这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。
更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。
从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。
当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。
向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:
最后附上策略源代码:
from datetime import time
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager
)
class RBreakStrategy(CtaTemplate):
""""""
author = "KeKe"
setup_coef = 0.25
break_coef = 0.2
enter_coef_1 = 1.07
enter_coef_2 = 0.07
fixed_size = 1
donchian_window = 30
trailing_long = 0.4
trailing_short = 0.4
multiplier = 3
buy_break = 0 # 突破买入价
sell_setup = 0 # 观察卖出价
sell_enter = 0 # 反转卖出价
buy_enter = 0 # 反转买入价
buy_setup = 0 # 观察买入价
sell_break = 0 # 突破卖出价
intra_trade_high = 0
intra_trade_low = 0
day_high = 0
day_open = 0
day_close = 0
day_low = 0
tend_high = 0
tend_low = 0
exit_time = time(hour=14, minute=55)
parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(RBreakStrategy, self).__init__(
cta_engine, strategy_name, vt_symbol, setting
)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
self.bars = []
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if last_bar.datetime.date() != bar.datetime.date():
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
CSV格式数据示例如下
表头及第一行数据示例
交易日,合约代码,交易所代码,合约在交易所的代码,最新价,上次结算价,昨收盘,昨持仓量,今开盘,最高价,最低价,数量,成交金额,持仓量,今收盘,本次结算价,涨停板价,跌停板价,昨虚实度,今虚实度,最后修改时间,最后修改毫秒,申买价一,申买量一,申卖价一,申卖量一,申买价二,申买量二,申卖价二,申卖量二,申买价三,申买量三,申卖价三,申卖量三,申买价四,申买量四,申卖价四,申卖量四,申买价五,申买量五,申卖价五,申卖量五,当日均价,业务日期
20190102,ru1905,,,11280.0000,11290.0000,11305.0000,322472,11280.0000,11280.0000,11280.0000,246,27748800.0000,322468,0.0000,0.0000,12080.0000,10495.0000,0,0,08:59:00,500,11280.0000,10,11290.0000,10,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,112800.0000,20190102
可以发现几个问题:
基于csv格式的特点,开发载入tick数据到数据库的脚本,脚本功能如下:
脚本实现代码如下:
import os
import csv
from datetime import datetime, time
from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager
from vnpy.trader.object import TickData
def run_load_csv():
"""
遍历同一文件夹内所有csv文件,并且载入到数据库中
"""
for file in os.listdir("."):
if not file.endswith(".csv"):
continue
print("载入文件:", file)
csv_load(file)
def csv_load(file):
"""
读取csv文件内容,并写入到数据库中
"""
with open(file, "r") as f:
reader = csv.DictReader(f)
ticks = []
start = None
count = 0
for item in reader:
# generate datetime
date = item["交易日"]
second = item["最后修改时间"]
millisecond = item["最后修改毫秒"]
standard_time = date + " " + second + "." + millisecond
dt = datetime.strptime(standard_time, "%Y%m%d %H:%M:%S.%f")
# filter
if dt.time() > time(15, 1) and dt.time() < time(20, 59):
continue
tick = TickData(
symbol="RU88",
datetime=dt,
exchange=Exchange.SHFE,
last_price=float(item["最新价"]),
volume=float(item["数量"]),
bid_price_1=float(item["申买价一"]),
bid_volume_1=float(item["申买量一"]),
ask_price_1=float(item["申卖价一"]),
ask_volume_1=float(item["申卖量一"]),
gateway_name="DB",
)
ticks.append(tick)
# do some statistics
count += 1
if not start:
start = tick.datetime
end = tick.datetime
database_manager.save_tick_data(ticks)
print("插入数据", start, "-", end, "总数量:", count)
if __name__ == "__main__":
run_load_csv()
效果展示
本教程使用MobaXterm、Xubuntu-destop、vnc4server来搭建阿里云ubuntu18.04。尽管过程有些复杂,但跟着图文教程一步一步去做,肯定能成功的。
MobaXterm是一款增强型远程连接工具,可以轻松地来使用Linux上的GNUUnix命令。这样一来,我们可以不用安装虚拟机来搭建虚拟环境,然后只要通过MobaXterm就可以使用大多数的Linux命令。本教程主要使用MobaXterm的SSH和VNC功能:SSH可以想象成Ubuntu的终端(无图形界面),VNC是Ubuntu的图形操作界面。
首先,从官网下载MobaXterm (https://mobaxterm.mobatek.net/download-home-edition.html)
下载完成后解压安装包,直接双击.exe文件进行安装。
安装完成后,启动MobaXterm,在主界面中单击导航栏左边第一个【Session】进入连接页面。(或者单击点击菜单栏【Sessions】->【new Session】按钮也行)
在弹出的新页面Session Settings中,单击导航栏最左边的【SSH】按钮。然后在Basic SSH Settings中输入云服务器的公网IP和账号。其中默认账号是root,输入root账号之前记得把左边小方框的√打上,端口号保留默认的22。然后点击最下方的【OK】按钮。
之后会弹出一个新页面:第一次连接,左边的黑框会提示输入云服务器的密码(密码输入输入界面不会有任何反应)。输入完按回车键后,若密码正确,会弹出一个小窗口提示是否保存密码,可以点击【Yes】按钮。
出现下图就表示阿里云Ubuntu连接成功:左边是云服务器的文件夹,右边的黑框是命令操作界面。到这里,就完成了使用MobaXterm远程连接云服务器了。当然,这种连接是基于SSH连接的,只能使用阿里云Ubuntu的终端功能,图形化界面还需要另外搭建。
为了更好管理界面,需要进行一下重命名:鼠标点击最左边的【Session】选项,显示刚刚创建的SSH连接,鼠标选定该连接,右键选择【Rename session】 会弹出Session settings界面,在里面的Session name可输入新的名字,如DEV_1。
输入完毕,钮点击左下方的【OK】即可改名成功。
同理按照上面的操作,输入相同的云服务器的公网IP和账号,创建第二个SSH连接,命名为DEV_2。这样我们就能同时使用2个终端了。
Ubuntu系统在安装软件前,需要更新其软件仓储列表。这也是于Windows系统的一大差异。
在Windows下安装软件,我们只需要有.exe文件,然后一直双击【下一步】即可完成。但Linux并非如此:每个Linux的发行版,比如Ubuntu,都会维护一个自己的软件仓库,我们常用的几乎所有软件都在这里面。这里面的软件绝对安全,而且绝对的能正常安装。
故在Ubuntu下,我们需要维护一个源列表,源列表里面都是一些网址信息,这每一条网址就是一个源,这个地址指向的数据标识着这台源服务器上有哪些软件可以安装使用。
所以,为了能够正常安装软件,需要更新软件包管理器里里面的软件列表。在Ubuntu终端输入命令sudo apt-get update
,会访问源列表里的每个网址,并读取软件列表,然后保存在本地电脑。
Xfce是一款针对Linux系统的现代化轻型开源桌面环境。其最大的优点是内存消耗小系统资源占用率很低。但是Xfce用起来并不方便,需要另外安装其他包,如支持中文显示,安装火狐浏览器等。
而Xubuntu-destop则免去这个麻烦,它整合了Xfce桌面环境和其他支持包,让用户搭建起来更加方面。安装方法也相对简单,在终端中输入命令sudo apt-get install xubuntu-desktop
即可。
VNC是一款基于RFB协议的屏幕画面分享及远程操作软件。它最大的特色在于跨平台性,即我们可以用Windows电脑控制Linux系统或苹果的Mac OS,反之亦然。
首先,安装VNC服务器:在终端下输入命令sudo apt-get install vnc4server
。
安装完毕后,在终端输入vncserver
运行服务器,首次运行需要设置密码(长度至少是6位)并且二次输入来确认。
VNC连接好后可以看到其默认端口是1(红色方框标识“:1”)。
然而,尽管连接上VNC,不代表客户能够立刻实现图形化界面操作,还需要配置xstartup文件和配置MobaXterm的VNC设置。
首先,用文本编辑器nano打开xstarup文件,在终端输入命令nano ~/.vnc/xstartup
,可以看到如下内容。
需要在最后一行 "x-window-manager &" 前面添加一个"#",以注释不再需要的配置。然后在文件最后加入一段配置信息:
sesion-manager & xfdesktop & xfce4-panel &
xfce4-menu-plugin &
xfsettingsd &
xfconfd &
xfwm4 &
修改完毕后,需要保存退出,nano保存退出的方法相对简单:按住“ctrl”和“x”键即可。
配置完xstarup后,还需要配置端口信息:先把默认的1号端口会话杀掉,然后生成新的会话,我们改成9号端口(因为1号端口容易被攻击),然后设置图形界面的分辨率位1920x1080。
注意:杀掉原先会话,建立新的会话,在每次启动VNC图形界面前都要做。
vncserver -kill :1
vncserver -geometry 1920x1080 :9
现在回到MobaXterm主界面,单击主页最上边的【Sessions】->【new Session】弹出【Sessions settings】界面,这一次选择【VNC】连接。
【IP address】为阿里云公网IP,【Port】为VNC连接端口,vncserver -geometry 1920x1080 :9意为在9号端口启动,故从默认的5900调整为5909。
在下面的【Bookmark settings】界面对该VNC连接进行重命名为“VNC”。输入完毕,钮点击左下方的【OK】按会弹出基于Xfce图形化界面。
IBus中文输入法是Ubutnu常用的中文输入法。安装方法也比较简单,在终端中输入命令sudo apt install ibus-pinyin
即可。
使用中文输入法之前需要配置中文语言包:
设置完中文语言包后,进入IBus输入法设置:
Vscode是微软出品的轻量级代码编辑器,安装和使用起来非常方便。
首先通过火狐浏览器打开百度,搜索“vscode”,第一个就是官网地址。在官网首页安装点击下载.deb版本。
Ubuntu下的Vscode安装包是.deb格式的,需要用使用dpkg命令来安装。进入下载文件所在的目录/root/Downloads,鼠标在空白处右键点击【Open Terminal Here】进入终端,输入下面命令安装Vscode。
sudo dpkg -i code_1.37.0-1565227985_amd64.deb
安装完Vscode之后,发现不能正常启动,因为Xfce和Vscode的兼容性问题,在终端中输入下面命令即可正常运行。(命令输入后界面没有任何反应)
sudo sed -i 's/BIG-REQUESTS/_IG-REQUESTS/' /usr/lib/x86_64-linux-gnu/libxcb.so.1
阿里云ubuntu18.04 已经安装了Python2.7以及Python3.6,并且默认启动的是Python2.7。
而vn.py是基于python3.7。故面临着一个问题:需要把新安装的Python3.7设置位系统默认的Python环境,并且pip3安装库需要对应Python3.7,而不是原来的3.6版本。
这样的Python版本管理起来非常复杂,所以我们使用了MiniConda(Python3.7 64位),它是Anaconda的简化版。安装MiniConda后,会自动设置其Python3.7为系统默认环境,而且提供了conda install的命令代替的pip3来安装其他库。
Miniconda安装也是非常简单的,首先在百度上搜索“miniconda”,第一个就是官网下载地址。打开官网页面后,选择【Linux】系统的Python3.7【64-bit】版本来下载。下载完成后,进入文件所在目录/root/Downloads可以看到.sh格式的Miniconda安装包。鼠标在空白处右击点击【Open Teminal Here】进入终端,然后输入命令bash Miniconda3-latest-Linux-x86_64.sh
进行安装。
安装完毕后重启MobaXterm后,终端输入Pyhton
可以看到Python默认版本已经变成Python3.7了
海龟策略的信号来源于唐奇安通道突破。简单的说就是若突破上轨则做多,突破下轨则做空。我们可以对信号进行改良,如换成布林带通道,金肯特纳通道等等,并且增加过滤条件和离场条件。
但是呢?新的问题又来了:若用了新的指标,需要通过不断调试来得到“最优”参数,这样会耗费大量的时间。
那么,有没有办法在尽量少得时间内,尽可能得到全局最优解或者次优解呢?
答案就是遗传算法啦!
具体原理详见:
遗传算法原理简介
一文读懂遗传算法工作原理(附Python实现)
遗传算法要做的事情并不复杂:
1.随机生成待优化的策略参数
def parameter_generate():
'''
根据设置的起始值,终止值和步进,随机生成待优化的策略参数
'''
parameter_list = []
p1 = random.randrange(4,50,2) #入场窗口
p2 = random.randrange(4,50,2) #出场窗口
p3 = random.randrange(4,50,2) #基于ATR窗口止损窗
p4 = random.randrange(18,40,2) #基于ATR的动态调仓
parameter_list.append(p1)
parameter_list.append(p2)
parameter_list.append(p3)
parameter_list.append(p4)
return parameter_list
def object_func(strategy_avg):
"""
本函数为优化目标函数,根据随机生成的策略参数,运行回测后自动返回2个结果指标:收益回撤比和夏普比率
"""
# 创建回测引擎对象
engine = BacktestingEngine()
# 设置回测使用的数据
engine.setBacktestingMode(engine.BAR_MODE) # 设置引擎的回测模式为K线
engine.setDatabase("VnTrader_Daily_Db", 'XBTHOUR') # 设置使用的历史数据库
engine.setStartDate('20170401') # 设置回测用的数据起始日期
engine.setEndDate('20181230') # 设置回测用的数据起始日期
# 配置回测引擎参数
engine.setSlippage(0.5)
engine.setRate(0.2/100)
engine.setSize(10)
engine.setPriceTick(0.5)
engine.setCapital(1000000)
setting = {'entryWindow': strategy_avg[0], #布林带窗口
'exitWindow': strategy_avg[1], #布林带通道阈值
'atrWindow': strategy_avg[2], #CCI窗口
'artWindowUnit': strategy_avg[3],} #ATR窗口
#加载策略
engine.initStrategy(TurtleTradingStrategy, setting)
# 运行回测,返回指定的结果指标
engine.runBacktesting() # 运行回测
#逐日回测
engine.calculateDailyResult()
backresult = engine.calculateDailyStatistics()[1]
returnDrawdownRatio = round(backresult['returnDrawdownRatio'],2) #收益回撤比
sharpeRatio= round(backresult['sharpeRatio'],2) #夏普比率
return returnDrawdownRatio , sharpeRatio
3.运行基于Deap库的遗传算法(具体步骤看代码中文注释)
#设置优化方向:最大化收益回撤比,最大化夏普比率
creator.create("FitnessMulti", base.Fitness, weights=(1.0, 1.0)) # 1.0 求最大值;-1.0 求最小值
creator.create("Individual", list, fitness=creator.FitnessMulti)
def optimize():
""""""
toolbox = base.Toolbox() #Toolbox是deap库内置的工具箱,里面包含遗传算法中所用到的各种函数
# 初始化
toolbox.register("individual", tools.initIterate, creator.Individual,parameter_generate) # 注册个体:随机生成的策略参数parameter_generate()
toolbox.register("population", tools.initRepeat, list, toolbox.individual) #注册种群:个体形成种群
toolbox.register("mate", tools.cxTwoPoint) #注册交叉:两点交叉
toolbox.register("mutate", tools.mutUniformInt,low = 4,up = 40,indpb=0.6) #注册变异:随机生成一定区间内的整数
toolbox.register("evaluate", object_func) #注册评估:优化目标函数object_func()
toolbox.register("select", tools.selNSGA2) #注册选择:NSGA-II(带精英策略的非支配排序的遗传算法)
#遗传算法参数设置
MU = 40 #设置每一代选择的个体数
LAMBDA = 160 #设置每一代产生的子女数
pop = toolbox.population(400) #设置族群里面的个体数量
CXPB, MUTPB, NGEN = 0.5, 0.35, 40 #分别为种群内部个体的交叉概率、变异概率、产生种群代数
hof = tools.ParetoFront() #解的集合:帕累托前沿(非占优最优集)
#解的集合的描述统计信息
#集合内平均值,标准差,最小值,最大值可以体现集合的收敛程度
#收敛程度低可以增加算法的迭代次数
stats = tools.Statistics(lambda ind: ind.fitness.values)
np.set_printoptions(suppress=True) #对numpy默认输出的科学计数法转换
stats.register("mean", np.mean, axis=0) #统计目标优化函数结果的平均值
stats.register("std", np.std, axis=0) #统计目标优化函数结果的标准差
stats.register("min", np.min, axis=0) #统计目标优化函数结果的最小值
stats.register("max", np.max, axis=0) #统计目标优化函数结果的最大值
#运行算法
algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats,
halloffame=hof) #esMuPlusLambda是一种基于(μ+λ)选择策略的多目标优化分段遗传算法
return pop
夏普比率1.9,总收益率1958%,最大百分比回撤37%,收益回撤比达53。
其解集收敛程度如下:
在得到一个好的曲线后,还要检查一下这些参数是否符合市场逻辑,尽量去避免过拟合的情况。下面举个反例:
这里使用金肯特纳通道+基于固定百分比移动止损策略。不管从曲线的形态和收敛程度来看都是挺正常的
但是在这种优化后参数中我们观察到trailingPercent=18%,这意味着价格从最高点回落18个点才平仓离场。在正常情况,这会带来非常糟糕的盈亏比。
遗传算法本质上是一种加快策略研究的技术,相对于暴力穷举,它可以大大节省电脑运算时间。我们可以使用它,但不能过度依赖它,因为有可能输出的仅仅是一些很巧合的参数。所以,针对这些参数,需要做更加细致的回测。
毕竟,在策略研究中,细心与耐心也是非常重要的。
K线合成器
Q:CTA策略默认传入 1min K 线数据,在哪个函数中传入默认参数 1min?
A:BarGenerator里面的update_tick()用于把tick数据合成1分钟K线数据,update_bar()是用于把1分钟数据合成X分钟数据。可以参考布林带策略示例,那里提供1分钟K线合成为15分钟K线,并且基于15分钟K线来产生买卖信号。
Q:BarGenerator的最大值是不是只能60?
A:合成分钟线的时候,周期最大只能为60(基于60分钟整除来进行N分钟切分);合成小时线的时候,周期可以为任意值(基于多少个小时过去了,进行合成)
K线时间序列管理器
Q:ArrayManager的初始化函数默认size=100,size指的是什么?
A:size是指这个K线时间序列容器缓存的数据量的大小,理论上只要超过了策略中要计算的所有技术指标最长的那个周期,就够用了。比如你要算MA20 RSI14 CCI50,那么最少需要size=50,否则CCI计算的数据量就不够,一般情况下还会在size上加上一定的量,来避免talib中某些指标算法可能需要更长的数据,保证计算的正确性。
Q:策略使用5分钟K线,ArrayManager初始化 self.am = ArrayManager(100)。在初始化size这里输入100的话,请问vn.py会从数据库里面提取100根1分钟的bar还是500根1分钟的bar来初始化指标?
A:缓存到100跟5分钟K线后才会完成初始化状态。
Q:talib安装失败,怎么解决?
A:使用手动安装:
1.进入Unofficial Windows Binaries for Python Extension Packages中找到talib对应的版本(如py3.7,64位)
2.下载对应版本的文件TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl
3.下载好whl文件之后,直接在命令行下安装文件即可,如下。成功后会显示“Sucessful installed TA_Lib?0.4.17”
pip install TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl
Q:如何导出计算的技术指标数值
A:最简单的方法就是直接print了,或者也可以写入到文件里。
回测数据
Q:CTA回测组件提示K线已经下载完成,但是在sqlite数据库中查看不到记录。
A:数据被插入到当前用户目录的database.db中,如C:\Users\HOME\.vntrader\database.db
Q:实盘时vn.py数据是tick级数据推送,回测时数据是分钟级的,它会识别时间戳吗?
A:bar回测的模式,在策略内部将tick合成为bar或者将1分钟bar合成为x分钟的bar
Q:回测加载数据,显示载入数据量为0。
A:没有连接数据库,或者数据库无数据
Q:初始化策略,默认initDays=10,改成初始一定数量的分钟数是不是更加合理?
A:若载入充足的历史数据,就可以立刻交易了。
Q:数据商也提供逐笔数据如何用来回测?
A:用逐笔自行还原出完整的订单簿tick数据,所以用tick模式来回测。
Q:如何获取A股分钟线数据?
A:目前没有免费的下载渠道(交易所禁止),推荐通过米筐的RQData获取。
Q:如何更改数据库存放盘?
A:举个例子:在D盘创建目录D:\test;在D:\test下创建文件夹.vntrader;使用VN Station的VN Trader Pro切换到D:\test目录启动;此时启动的VN Trader运行时目录已经是D:\test(可以在标题栏看到)
参数优化
Q:用capital作为目标,输出结果全是0
A:因为在逐日统计回测中,capital代表的是起始资金,无法作为优化的目标。可以改用endBalance,sharpeRatio,maxDrawdown等目标优化参数试一下。
Q:engine.trades可以输出交易记录,但是记录里只有time,没有具体的date,请问哪里可以输出具体的交易日期和时间?
A:trades记录里的trade对象,有个额外的datetime字段是用来标识该成交的日期时间的。
实盘运行
Q:自定义策略需要放在哪里?
A:在当前运行的脚本目录先创建创建strategies文件夹,然后把策略文件放进去就可以了。 配置文件可以创建.vntrader, 然后把配置文件放进去。
Q:怎样在on_bar里output数据,以此检查策略是不是按照我的想法在运转
A:可以直接在策略初始化的时候打开一个txt文件句柄,然后回测过程中随时往里面写记录
Q:点击CTA策略出现了json错误
A:json读取错误,如把.vntrader下的cta_strategy_data.json删除可解决
Q:CTA模块中是否有自带的变量存储合约的最小变动价位数据?
A:策略发出的买卖指令的价格,会自动根据最小价格变动pricetick进行round取整,无需最小价格变动的数据了
Q:如何在无人值守脚本获得所有合约的信息?
A:调用main_engine.get_all_contracts函数
Q:用CTPtest抓所有合约, 郑交所的合同没有菜粕,苹果。
A:调用get_all_contracts前,sleep等待5秒,接收合约数据推送需要时间;某些测试环境里,是会缺少部分合约的。
Q:为什么使用VnTrader的cta策略组件进行日常实盘交易时,每天交易时段结束之后,一定要把VnTrader关闭,然后在下次开盘前15分钟在重启并初始化策略的参数?
A:关闭交易系统主要是为了清空系统内部(接口层、引擎层)的缓存状态,策略层的缓存状态一般倒是不太容易出问题的(除非你逻辑写错了)
Q:请问backtesting里的cross_limit_order和cross_stop_order什么意思
A:撮合限价单委托,撮合本地停止单(条件单)委托。讲最新的行情K线或者TICK和策略之前下达的所有委托进行检查,如果能够撮合成交,则返回并记录数据
Q:如何实现k线内成交?
A:用的是本地停止单
就比如当前价格是100点,策略发出信号,在下一根K线的120线发出买入:
若价格没突破到120点,继续挂着。
若价格从100涨到130,那么在120点的时候停止单变成市价单追上去保证尽可能成交。
若下一根K线的开盘价大于120,那么以开盘价来立刻成交。
Q:回测1h k线数据, 为什么每一笔订单成交记录都比委托记录晚一个小时 , 请问这个是在哪里设置的?
A:委托时间,是你发出委托请求的时间。成交时间,是你的成交发生的事件。CTA策略回测遵循T时刻发出的委托,永远只在T+1后才能进行撮合的规则(否则会有未来函数)。
实盘中,你的成交时间可能和委托时间非常接近,但是回测中受限于数据没法确定,只能用T+1那根K线的时间戳近似。
Q:如果停止单触发下单之后一段时间没有执行的话,会撤单吗,什么时候撤单,撤单之后还会追单吗?
A:不会,除非策略执行撤单操作。
Q:如果停止单触发下单之后,部分成交,接下来会撤单还是追单?
A:策略会收到这笔委托的回报,用户可以自行处理,不会自动撤单或者追单
Q:如果同时持有多空,pos是单向的,该怎么处理? 例如:如果持有1手多单,sell卖平的委托没有成交,紧接着的short卖开的单子成交了,pos 是多少?
A : 对于CTA策略模块来说,策略的持仓,是基于策略本身发出去的委托成交维护的逻辑持仓(而非账户底层的实际持仓),所以pos会为0
Q:假如策略下了1手多单,手动下了1手多单,pos 是多少 ?
A : 人手工下的单子无影响
Q:假如策略下了2手多单,手动平仓1手,pos 是多少 ?
A:人手工下的单子无影响
Q:两个策略都在跑同一个代码,应该是各自有各自的pos 吧 ?
A:对的,这两个逻辑持仓互相无关
Q:框架对pos值的更新,是在onTrade 和 onOrder推送动作前,还是推送动作后?
A:onTrade推送前更新,保证策略收到onTrade回调的时候,pos已经是最新数据。
Q:vt_setting.json的路径到底在哪?
A:在c:\users\administrator.vntrader目录下,是基于Python的pathlib实现的
Q:positionData中的 yd_volume 是指什么?
A:昨仓,这个主要针对中国市场的股票(只能卖出昨日的股票)和期货(昨仓平仓手续费不同)
Q:当前CTA模块是不是不能在策略里获取当前资金情况?
A:不能获取资金和账户级别的持仓。策略的仓位管理(风险分配)应该由交易员来做,而不是让程序自动做
Q:非交易时间报单,是直接返回拒单,还是返回提交中等开盘了再打出去?
A:在策略层,如on_bar()函数里面第一个逻辑应该是self.cancel_all(),目的是清空为成交的委托(包括本地停止单,限价单),保证当前时间点委托状态是唯一的。
若在非交易时间发单,服务器收到这个委托会推送一个拒单的委托回报。
Q:vn.py如何查询实盘账户的历史资金情况
A:大部分交易系统并未提供历史资金查询功能,一般都是只能查当前时间点的资金,所以需要你自己保存。
其他问题
Q: 请问下,vn.py中有期权回测的例子么?vn.py 关于期权,是不是就只有OptionMatser模块?
A:期权的波动率交易策略一般无需回测,更多依赖建模;可以用其他组件交易期权,比如CTA策略模块赌趋势,或者SpreadTrading模块赌价差,但这些本质都不是期权交易策略。
Q:如何根据资金量进行下单,而不是固定手数下单?
A:vn.py框架下不建议交易程序在实盘中去获取账户可用资金,并调整交易手数,这是很危险的事情。
CTP接口
Q:如何实现行情并发推送?
A:C++ API的回调函数只有一个推送线程;用户可以一次性订阅全市场的合约,某个合约有行情变动的时候才会推送.
Q:CTP能可否获取到指定经纪商的手续费?
A:这个手续费应该是连带期货公司的部分,但是不建议在交易程序中去访问这种数据,相关数据应该每天在启动策略前就获取好配置到策略里。
Q:郑商所的品种都收不到14:59这一根bar
A:郑商所的数据推送,没有3点后的最后一个tick用于标识收盘完成,所以要调用BarGenerator.generate函数来做最终的强制K线生成
Q:国内期货模拟,除了sinnow可以模拟,还有别的账号可以模拟吗,可以去期货公司申请模拟账号模拟吗?
A:SimNow是目前最稳定的仿真环境了,可以找期货公司申请,比如中信、上海中期等。
Q:sinnow连接有时会断开,然后传送的tick数据的时间就会延后,就是收盘时,本来应该平仓平不了,超过三点了还在传tick。
A:SimNow环境因为免费,用的人很多,所以服务器有时会卡。
Q:为什么有时候会不停的断开重连呢?
A:服务器关了或者人满了,或者账号密码错误。
Q:连接进 SimNow后,按”市价“下单被拒绝,按”限价“”FAK“”FOK"下单可以,请问,是否不支持“市价”下单。
A:SimNow不支持市价单,实盘支持。
Q:连接SimNow后,下单提示“提交中“无法成交也无法撤单
A:这种情况,一般是委托请求没有到CTP柜台(网络断了),或者CTP柜台挂了
Q:SimNow上不去,上去了注册又总是提示验证码失败,还有其他模拟的推荐吗?
A:SimNow是目前最推荐的仿真环境,建议换交易时间注册,以及SimNow主要支持移动和联通的手机号
Q:CTP配置无法连接:输入论坛登录名,账号,配置对应的交易服务器和行情服务器,点击连接,无任何反应
A:CTP的测试账号请通过SimNow获取,不是vn.py论坛的
Q:已有行情数据显示。 但是不能发单,如rb1905
A:检查是否漏填交易所或者上委托数量的字段。
Q:下单异常,第一种情况:一点击委托就是直接“已撤销”(委托栏里委托状态),双击撤单的时候,又会显示“交易撤单失败,代码25”。第二种情况:一点击委托就是直接“提交中”(委托栏里委托状态),等到双击撤单的时候,又会显示“交易撤单失败,代码25”
A:第一个情况应该是报单不符合服务端的要求,被拒单撤销了;第二个情况感觉是你的网络断了,报单请求发出但没有到服务器。可以顺着这两个方向检查。
Q:CTPTEST测试交易期货公司采集不到硬盘序列号,CPU序列号,BIOS序列号
A:数据采集是通过API内部的代码自动完成的,其他任何上层程序都无法影响。建议换机器。
Q:登录穿透式仿真账号问题,一直报4097的错
A:不要同时import CTPTEST和CTP这两个接口
Q:使用ctp和ctptest登录都显示不合法登录。错误代码3
A:账户密码错误
Q:CTP能否两个账号同时登录并且同时操作。
A:同一个接口只能登录一次。如果要同时登录两个CTP,需要自己扩展修改CtpGateway,然后加载两个CtpGateway;对于不同的接口,比如股票的XTP和期货的CTP,可以直接同时登录使用,并在策略中同时交易这两个接口的合约
IB接口
Q:如何链接盈透api ?
A:启动TWS;在配置中打开Socket连接功能;在vn.py中加载ibGateway,然后启动就行。
Q:IB接口连接,错误提示显示:couldn't connect to TWS. confirm that "enable activex and socket clients" is enabled aports for new installations of version 954.1 or newer: TWS:7497:IB gateway: 4002。
A:请检查TWS是否打开了socket访问功能。
Q:启动行情记录,则程序假死。
A:IB的行情订阅函数没有异步缓存逻辑,用DataRecorder脚本的话,会在连接TWS之前就进行了订阅请求,导致死掉。IbGatewa已经加上了历史数据查询获取功能,直接从IB查询K线数据进行初始化就行,不建议自己录制了。
富途接口
Q:futu_gateway里面的登陆信息设置,为何只要密码?
A:需要先下载和安装FUTU OPEN API:https://www.futunn.com/openAPI
华宝派
Q:华宝派如何申请试用/实盘呢?
A:请在华宝证券开户,然后联系客户经理申请使用华宝PI
交易复制
Q:当跟单帐户检测到被跟单帐户的仓位变化后,具体操作是什么?
A:TradeCopy的发布者账号,维护一份本地持仓数据表,当有成交推送时立即更新计算最新仓位;发布者每当收到成交推送时,或者每隔一定的时间间隔(默认1秒),会广播一次当前自己的仓位信息;订阅者收到广播推送的发布者仓位后,乘以自身的复制系数,作为目标仓位;订阅者根据目标仓位,和自身实际持仓的偏差,决定具体的下单操作(目标是将实际持仓同步到和目标持仓一致),如果有之前的委托,会先执行撤单。
RQData
Q:import rqdatac 失败,没有找到rqdatac包
A:运行以下命令安装
pip install --extra-index-url https://rquser:ricequant99@py.ricequant.com/simple/ rqdatac==1.0.0a66
Q:如何配置RQDAAT账户?
A:申请试用账号:https://www.ricequant.com/purchase
在VN Trader主界面上,点配置,rqdata.username rqdata.password输入
重启VN Trader就能用了
工作线程
Q:vn.py运行的时候,会启动哪些线程?
A:
1.主线程:带PyQt界面时运行Qt的循环,无界面时可以直接阻塞或者用while循环
2.事件引擎线程:处理事件引擎队列中的事件,并调用注册的处理函数处理,所以如果是CtaStrategy层,所有回调函数你可以认为都是单线程在驱动的(每次只有一个在调用)
3.API层线程:不同的API不一样了。
Q:eventEngine2.__queue很多数据没有处理、队列一直变大?
A:如果queue的大小只增加,不减少,只可能是没有启动EventEngine,导致事件没有处理持续挤压导致的。否则运行过程中即使没有注册处理函数,该事件的数据也会被抛弃掉,不会继续保存着。
行情记录
Q:行情记录后怎么查看和下载历史行情数据?
A:行情记录模块是将tick或者bar直接存入你配置的数据库的,你要单独查看,可以用数据库可视化工具连接本地数据库查看,如果在vn.py里回测使用,不需要下载,是默认从数据库里找相关数据的
Q:datarecoder 和 cta running 的 CTP能不能分开设置?
A:可以,另外新建一个目录,里面创建.vntrader文件夹,在这个目录用run.py或者VN Station启动VN Trader,CTP接口的登录信息就都是独立的了。
其他
Q:网站上下载的vnpy ,和Anaconda site-package里面的vnpy有什么区别?
A:进行运行的是ananconda里面的vnpy。如同在anaconda里面调用numpy一样。
Q:修改vnpy代码后需要更新到anaconda site-package对应的文件里?
A:python里import的vnpy就是site-packages里的,你可以修改下环境变量,把你clone的那个目录加入搜索路径,这样你修改了clone的那个vn.py,用的时候就自动改了
Q:一键安装完2.0.5 后,再另外安装anaconda3, spyder无法使用
A:假设你安装到c:\anaconda3。打开cmd,运行c:\anaconda3\scripts\activate,然后再运行python,就会进入anaconda环境了
Q:请问算法交易怎么用,可以策略生成下单指令,由算法下单吗?
A:目前AlgoTrading模块主要通过GUI和篮子委托文件的方式来实现算法下单
尽管可以通过扩展的方式,实现策略调用算法执行交易,但更建议在CTA策略中自行实现算法交易的逻辑,获得更好的细节控制能力
Q:vn.py中配置界面中email各项设置如何填写,有何用处?
A:设置如下
"email.server": "SMTP邮件服务器地址",
"email.port": SMTP邮件服务器端口号,
"email.username": "邮箱用户名",
"email.password": "邮箱密码",
"email.sender": "发送者邮箱",
"email.receiver": "接收者邮箱",
Q:找回vn.py社区的密码
A:在这个页面可以找回密码:https://www.vnpy.com/auth/reset-password
Q:注册了社区账号,但是登录报错。[WinError 10061] 由于目标计算机积极拒绝,无法连接
A:这个是代理服务器问题吧,换个网络试试。
Q:维恩的派和vn.py社区有什么关系?
A:维恩的派是vn.py社区2015-2018年的老论坛,但由于discuz的各种问题使用体验太差,现在已经停止使用,将在19年底正式下线。
Q:社区怎么上传照片的?
A:直接把图片拖动到编辑框中就能自动上传了
Q:有微信群,QQ群吗?
A:vn.py框架学习群:666359421 ; vn.py框架交流群:262656087
作者:张国平 ;来源:维恩的派论坛
在微信公众号 “量化投资与机器学习”, 看到一个推文“我就不用AI、ML模型预测股价,来点不一样的“, 链接如下:
我就不用AI、ML模型预测股价,来点不一样的!
交易思维是基于历史数据中,一组数据比如100天中,K线中最高点或者最低点相对于开始价位价差点差,再利用numpy的函数numpy.percentile(), 计算在比如95%机会,最高点或者最低点的点差数字。如果点差是5个点,就可以认为下一根K线也有95%概率有5个点受益。
尝试在VNPY实现。
思路整理:
回测设置
回测效果
代码如下:
# encoding: UTF-8
from __future__ import division
from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT, OFFSET_OPEN,OFFSET_CLOSE
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarGenerator,
ArrayManager)
import numpy as np
from datetime import datetime, time
########################################################################
class PercentileStrategy(CtaTemplate):
"""MACD策略Demo"""
className = 'PercentileStrategy'
author = u'BillyZhang'
fixedSize = 1
# 策略参数
calWindow = 15
percentile = 95
tickValueLimit = 5
Multiple = 0.8
# 策略变量
p = 0
tickValue = 0
tradeSign = 0
tickValueHigh = 0
tickValueLow = 0
longStop = 0 # 多头止损
shortStop = 0 # 空头止损
margin = 0
lowerLimit = 0
upperLimit = 50000
# 时间
initDays = 0
DAY_START = time(9, 10) # 日盘启动和停止时间
DAY_END = time(14, 55)
NIGHT_START = time(21, 10) # 夜盘启动和停止时间
NIGHT_END = time(10, 55)
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'initDays',
'fixedSize',
'calWindow',
'percentile',
'tickValueLimit',
'Multiple'
]
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'longStop',
'shortStop',
'posPrice',
'lowerLimit',
'p',
'tickValue',
'tradeSign',
'tickValueHigh',
'tickValueLow'
]
# 同步列表,保存了需要保存到数据库的变量名称
syncList = ['pos',
'posPrice',
'longStop',
'shortStop'
]
# ----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(PercentileStrategy, self).__init__(ctaEngine, setting)
self.am = ArrayManager(size = self.calWindow)
# 注意策略类中的可变对象属性(通常是list和dict等),在策略初始化时需要重新创建,
# 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险,
# 策略类中的这些可变对象属性可以选择不写,全都放在__init__下面,写主要是为了阅读
# 策略时方便(更多是个编程习惯的选择)
# ----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' % self.name)
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
# ----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
if self.pos == 0:
self.writeCtaLog(u'%s策略启动' % self.name)
# 当前无仓位,发送开仓委托
# 持有多头仓位
self.putEvent()
# ----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' % self.name)
self.putEvent()
# ----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送(必须由用户继承实现)"""
if self.lowerLimit == 0 or self.upperLimit == 0:
self.lowerLimit = tick.lowerLimit
self.upperLimit = tick.upperLimit
self.bg.updateTick(tick)
# ----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送(必须由用户继承实现)"""
#如果是当然最后5分钟,略过
am = self.am
am.updateBar(bar)
if not am.inited:
return
# currentTime = datetime.now().time()
currentTime = time(9,20)
#计算p,和tickValue
MaxHigh = am.high / am.open
MaxLow = am.low / am.open
MaxClose = am.close / am.open
lpHigh = np.percentile(MaxHigh, 100 - self.percentile)
lpLow = np.percentile(MaxLow, self.percentile)
self.tickValueHigh = abs(bar.open - bar.open*lpHigh)
self.tickValueLow = abs(bar.open - bar.open * lpLow)
if self.tickValueHigh > self.tickValueLow and self.tickValueHigh > self.tickValueLimit:
self.tradeSign = 1
elif self.tickValueHigh < self.tickValueLow and self.tickValueLow > self.tickValueLimit:
self.tradeSign = -1
else:
self.tradeSign = 0
# 平当日仓位, 如果当前时间是结束前日盘15点28分钟,或者夜盘10点58分钟,如果有持仓,平仓。
if ((currentTime >= self.DAY_START and currentTime <= self.DAY_END) or
(currentTime >= self.NIGHT_START and currentTime <= self.NIGHT_END)):
if self.pos == 0:
if self.tradeSign == 0:
pass
elif self.tradeSign == 1 and bar.close > self.lowerLimit:
self.buy(bar.close + 5,self.fixedSize,False)
elif self.tradeSign == -1 and bar.close < self.upperLimit:
self.short(bar.close - 5,self.fixedSize,False)
elif self.pos > 0:
if self.tradeSign == 1 or self.tradeSign == 0:
pass
elif self.tradeSign == -1:
self.sell(bar.close-5, abs(self.pos), False)
elif self.pos < 0:
if self.tradeSign == -1 or self.tradeSign == 0:
pass
elif self.tradeSign ==1:
self.cover(bar.close+5, abs(self.pos), False)
else:
if self.pos > 0:
self.sell(bar.close-5, abs(self.pos), False)
elif self.pos < 0:
self.cover(bar.close+5, abs(self.pos), False)
elif self.pos == 0:
return
# ----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略,可以忽略onOrder
pass
# ----------------------------------------------------------------------
def onTrade(self, trade):
# 发出状态更新事件
"""收到成交推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略,可以忽略onOrder
if trade.offset == OFFSET_OPEN:
self.posPrice = trade.price
if self.tradeSign == 1:
self.sell(self.posPrice + self.tickValueHigh,abs(self.pos),False)
self.sell(self.posPrice - self.Multiple*self.tickValueHigh, abs(self.pos), True)
elif self.tradeSign == -1:
self.cover(self.posPrice - self.tickValueLow, abs(self.pos), False)
self.cover(self.posPrice + self.Multiple*self.tickValueLow, abs(self.pos),True)
elif trade.offset == OFFSET_CLOSE:
self.cancelAll()
self.tradeSign = 0
# 同步数据到数据库
self.saveSyncData()
# ----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass
作者:时间 ;来源:维恩的派论坛 ;版本:v1.72
改写原因:行情记录多个合约的数据时,在下午收盘后依旧有很多tick数据未来的及插入数据库,尤其是在1.7.1版本时(mainEngine.dbUpdate()的模式下)
改写方式:
效果: 七个合约进行数据记录,基本实现生产和消费同步,测试时间不久,错误之处欢迎指正
代码如下:
# encoding: UTF-8
'''
本文件中实现了行情数据记录引擎,用于汇总TICK数据,并生成K线插入数据库。
使用DR_setting.json来配置需要收集的合约,以及主力合约代码。
'''
import json
import csv
import os
import copy
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue, Empty
from threading import Thread
from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.vtObject import VtSubscribeReq, VtLogData, VtBarData, VtTickData
from vnpy.trader.app.ctaStrategy.ctaTemplate import BarManager
from .drBase import *
from .language import text
########################################################################
# class DrEngine(object):
# """数据记录引擎"""
# settingFileName = 'DR_setting.json'
# settingFilePath = getJsonPath(settingFileName, __file__)
# # print u"数据记录配置文件地址:",settingFilePath
# #----------------------------------------------------------------------
# def __init__(self, mainEngine, eventEngine):
# """Constructor"""
# self.mainEngine = mainEngine
# self.eventEngine = eventEngine
# # 当前日期
# self.today = todayDate()
# # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
# self.activeSymbolDict = {}
# # Tick对象字典
# self.tickSymbolSet = set()
# # K线合成器字典
# self.bmDict = {}
# # 配置字典
# self.settingDict = OrderedDict()
# # 负责执行数据库插入的单独线程相关
# self.active = False # 工作状态
# self.queue = Queue() # 队列
# self.thread = Thread(target=self.run) # 线程
# # 载入设置,订阅行情
# self.loadSetting()
# # 启动数据插入线程
# self.start()
# # 注册事件监听
# self.registerEvent()
# #----------------------------------------------------------------------
# def loadSetting(self):
# """加载配置"""
# with open(self.settingFilePath) as f:
# drSetting = json.load(f)
# # 如果working设为False则不启动行情记录功能
# working = drSetting['working']
# if not working:
# return
# # Tick记录配置
# if 'tick' in drSetting:
# l = drSetting['tick']
# for setting in l:
# symbol = setting[0]
# gateway = setting[1]
# vtSymbol = symbol
# req = VtSubscribeReq()
# req.symbol = setting[0]
# # 针对LTS和IB接口,订阅行情需要交易所代码
# if len(setting)>=3:
# req.exchange = setting[2]
# vtSymbol = '.'.join([symbol, req.exchange])
# # 针对IB接口,订阅行情需要货币和产品类型
# if len(setting)>=5:
# req.currency = setting[3]
# req.productClass = setting[4]
# self.mainEngine.subscribe(req, gateway)
# #tick = VtTickData() # 该tick实例可以用于缓存部分数据(目前未使用)
# #self.tickDict[vtSymbol] = tick
# self.tickSymbolSet.add(vtSymbol)
# # 保存到配置字典中
# if vtSymbol not in self.settingDict:
# d = {
# 'symbol': symbol,
# 'gateway': gateway,
# 'tick': True
# }
# self.settingDict[vtSymbol] = d
# else:
# d = self.settingDict[vtSymbol]
# d['tick'] = True
# # 分钟线记录配置
# if 'bar' in drSetting:
# l = drSetting['bar']
# for setting in l:
# symbol = setting[0]
# gateway = setting[1]
# vtSymbol = symbol
# req = VtSubscribeReq()
# req.symbol = symbol
# if len(setting)>=3:
# req.exchange = setting[2]
# vtSymbol = '.'.join([symbol, req.exchange])
# if len(setting)>=5:
# req.currency = setting[3]
# req.productClass = setting[4]
# self.mainEngine.subscribe(req, gateway)
# # 保存到配置字典中
# if vtSymbol not in self.settingDict:
# d = {
# 'symbol': symbol,
# 'gateway': gateway,
# 'bar': True
# }
# self.settingDict[vtSymbol] = d
# else:
# d = self.settingDict[vtSymbol]
# d['bar'] = True
# # 创建BarManager对象
# self.bmDict[vtSymbol] = BarManager(self.onBar)
# # 主力合约记录配置
# if 'active' in drSetting:
# d = drSetting['active']
# self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}
# #----------------------------------------------------------------------
# def getSetting(self):
# """获取配置"""
# return self.settingDict, self.activeSymbolDict
# #----------------------------------------------------------------------
# def procecssTickEvent(self, event):
# """处理行情事件"""
# tick = event.dict_['data']
# vtSymbol = tick.vtSymbol
# # 生成datetime对象
# if not tick.datetime:
# tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# self.onTick(tick)
# bm = self.bmDict.get(vtSymbol, None)
# if bm:
# bm.updateTick(tick)
# #----------------------------------------------------------------------
# def onTick(self, tick):
# """Tick更新"""
# vtSymbol = tick.vtSymbol
# if vtSymbol in self.tickSymbolSet:
# self.insertData(TICK_DB_NAME, vtSymbol, tick)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(TICK_DB_NAME, activeSymbol, tick)
# self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
# time=tick.time,
# last=tick.lastPrice,
# bid=tick.bidPrice1,
# ask=tick.askPrice1))
# #----------------------------------------------------------------------
# def onBar(self, bar):
# """分钟线更新"""
# vtSymbol = bar.vtSymbol
# self.insertData(MINUTE_DB_NAME, vtSymbol, bar)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(MINUTE_DB_NAME, activeSymbol, bar)
# self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol,
# time=bar.time,
# open=bar.open,
# high=bar.high,
# low=bar.low,
# close=bar.close))
# #----------------------------------------------------------------------
# def registerEvent(self):
# """注册事件监听"""
# self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
# #----------------------------------------------------------------------
# def insertData(self, dbName, collectionName, data):
# """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
# self.queue.put((dbName, collectionName, data.__dict__))
# #----------------------------------------------------------------------
# def run(self):
# """运行插入线程"""
# while self.active:
# try:
# dbName, collectionName, d = self.queue.get(block=True, timeout=1)
# flt = {'datetime': d['datetime']}
# self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
# except Empty:
# pass
# #----------------------------------------------------------------------
# def start(self):
# """启动"""
# self.active = True
# self.thread.start()
# #----------------------------------------------------------------------
# def stop(self):
# """退出"""
# if self.active:
# self.active = False
# self.thread.join()
# #----------------------------------------------------------------------
# def writeDrLog(self, content):
# """快速发出日志事件"""
# log = VtLogData()
# log.logContent = content
# event = Event(type_=EVENT_DATARECORDER_LOG)
# event.dict_['data'] = log
# self.eventEngine.put(event)
#
########################################################################
# class DrEngine(object):
# """数据记录引擎"""
# settingFileName = 'DR_setting.json'
# settingFilePath = getJsonPath(settingFileName, __file__)
# print u"----------多线程行情数据记录-------------------"
# #----------------------------------------------------------------------
# def __init__(self, mainEngine, eventEngine):
# """Constructor"""
# self.mainEngine = mainEngine
# self.eventEngine = eventEngine
# # 当前日期
# self.today = todayDate()
# # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
# self.activeSymbolDict = {}
# # Tick对象字典
# self.tickSymbolSet = set()
# # K线合成器字典
# self.bmDict = {}
# # 配置字典
# self.settingDict = OrderedDict()
# # 负责执行数据库插入的单独线程相关
# self.active = False # 工作状态
# # self.queue = Queue() # 队列
# # self.thread = Thread(target=self.run) # 线程
# self.queue_dict=OrderedDict()
# self.threadlist=[]
# # 载入设置,订阅行情
# self.loadSetting()
# #加载多线程
# self.get_threadlist()
# # 启动数据插入线程
# self.start()
# # 注册事件监听
# self.registerEvent()
# #----------------------------------------------------------------------
# def loadSetting(self):
# """加载配置"""
# with open(self.settingFilePath) as f:
# drSetting = json.load(f)
# # 如果working设为False则不启动行情记录功能
# working = drSetting['working']
# if not working:
# return
# # Tick记录配置
# if 'tick' in drSetting:
# l = drSetting['tick']
# for setting in l:
# symbol = setting[0]
# gateway = setting[1]
# vtSymbol = symbol
# if symbol not in self.queue_dict:
# self.queue_dict[symbol]=Queue()
# req = VtSubscribeReq()
# req.symbol = setting[0]
# # 针对LTS和IB接口,订阅行情需要交易所代码
# if len(setting)>=3:
# req.exchange = setting[2]
# vtSymbol = '.'.join([symbol, req.exchange])
# # 针对IB接口,订阅行情需要货币和产品类型
# if len(setting)>=5:
# req.currency = setting[3]
# req.productClass = setting[4]
# self.mainEngine.subscribe(req, gateway)
# #tick = VtTickData() # 该tick实例可以用于缓存部分数据(目前未使用)
# #self.tickDict[vtSymbol] = tick
# self.tickSymbolSet.add(vtSymbol)
# # 保存到配置字典中
# if vtSymbol not in self.settingDict:
# d = {
# 'symbol': symbol,
# 'gateway': gateway,
# 'tick': True
# }
# self.settingDict[vtSymbol] = d
# else:
# d = self.settingDict[vtSymbol]
# d['tick'] = True
# # 分钟线记录配置
# if 'bar' in drSetting:
# l = drSetting['bar']
# for setting in l:
# symbol = setting[0]
# gateway = setting[1]
# vtSymbol = symbol
# if symbol not in self.queue_dict:
# self.queue_dict[symbol]=Queue()
# req = VtSubscribeReq()
# req.symbol = symbol
# if len(setting)>=3:
# req.exchange = setting[2]
# vtSymbol = '.'.join([symbol, req.exchange])
# if len(setting)>=5:
# req.currency = setting[3]
# req.productClass = setting[4]
# self.mainEngine.subscribe(req, gateway)
# # 保存到配置字典中
# if vtSymbol not in self.settingDict:
# d = {
# 'symbol': symbol,
# 'gateway': gateway,
# 'bar': True
# }
# self.settingDict[vtSymbol] = d
# else:
# d = self.settingDict[vtSymbol]
# d['bar'] = True
# # 创建BarManager对象
# self.bmDict[vtSymbol] = BarManager(self.onBar)
# # 主力合约记录配置
# if 'active' in drSetting:
# d = drSetting['active']
# self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}
# #----------------------------------------------------------------------
# def getSetting(self):
# """获取配置"""
# return self.settingDict, self.activeSymbolDict
# #----------------------------------------------------------------------
# def procecssTickEvent(self, event):
# """处理行情事件"""
# tick = event.dict_['data']
# vtSymbol = tick.vtSymbol
# # 生成datetime对象
# if not tick.datetime:
# tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# self.onTick(tick)
# bm = self.bmDict.get(vtSymbol, None)
# if bm:
# bm.updateTick(tick)
# #----------------------------------------------------------------------
# def onTick(self, tick):
# """Tick更新"""
# vtSymbol = tick.vtSymbol
# if vtSymbol in self.tickSymbolSet:
# self.insertData(TICK_DB_NAME, vtSymbol, tick)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(TICK_DB_NAME, activeSymbol, tick)
# self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
# time=tick.time,
# last=tick.lastPrice,
# bid=tick.bidPrice1,
# ask=tick.askPrice1))
# #----------------------------------------------------------------------
# def onBar(self, bar):
# """分钟线更新"""
# vtSymbol = bar.vtSymbol
# self.insertData(MINUTE_DB_NAME, vtSymbol, bar)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(MINUTE_DB_NAME, activeSymbol, bar)
# self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol,
# time=bar.time,
# open=bar.open,
# high=bar.high,
# low=bar.low,
# close=bar.close))
# #----------------------------------------------------------------------
# def registerEvent(self):
# """注册事件监听"""
# self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
# #----------------------------------------------------------------------
# def insertData(self, dbName, collectionName, data):
# """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
# symbol=data.vtSymbol
# symbol_queue=self.queue_dict.get(symbol,None)
# if symbol_queue:
# symbol_queue.put((dbName, collectionName, data.__dict__))
# #----------------------------------------------------------------------
# def run(self,symbol):
# """运行插入线程"""
# while self.active:
# try:
# single_queue=self.queue_dict.get(symbol,None)
# if single_queue:
# print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
# dbName, collectionName, d = single_queue.get(block=True, timeout=1)
# flt = {'datetime': d['datetime']}
# self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
# except Empty:
# pass
# #----------------------------------------------------------------------
# def get_threadlist(self):
# symbollist=self.queue_dict.keys()
# for symbol in symbollist:
# t=Thread(target=self.run,args=(symbol,))
# self.threadlist.append(t)
# def start(self):
# """启动"""
# self.active = True
# # self.thread.start()
# for t in self.threadlist:
# t.start()
# #----------------------------------------------------------------------
# def stop(self):
# """退出"""
# if self.active:
# self.active = False
# self.thread.join()
# #----------------------------------------------------------------------
# def writeDrLog(self, content):
# """快速发出日志事件"""
# log = VtLogData()
# log.logContent = content
# event = Event(type_=EVENT_DATARECORDER_LOG)
# event.dict_['data'] = log
# self.eventEngine.put(event)
########################################################################
class DrEngine(object):
"""数据记录引擎"""
settingFileName = 'DR_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 当前日期
self.today = todayDate()
# 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
self.activeSymbolDict = {}
# Tick对象字典
self.tickSymbolSet = set()
# K线合成器字典
self.bmDict = {}
# 配置字典
self.settingDict = OrderedDict()
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
# self.queue = Queue() # 队列
# self.thread = Thread(target=self.run) # 线程
self.queue_dict=OrderedDict()
self.threadlist=[]
# 载入设置,订阅行情
self.loadSetting()
#加载多线程
self.get_threadlist()
# 启动数据插入线程
self.start()
# 注册事件监听
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载配置"""
with open(self.settingFilePath) as f:
drSetting = json.load(f)
# 如果working设为False则不启动行情记录功能
working = drSetting['working']
if not working:
return
# Tick记录配置
if 'tick' in drSetting:
l = drSetting['tick']
for setting in l:
symbol = setting[0]
gateway = setting[1]
vtSymbol = symbol
if symbol not in self.queue_dict:
self.queue_dict[symbol]=Queue()
req = VtSubscribeReq()
req.symbol = setting[0]
# 针对LTS和IB接口,订阅行情需要交易所代码
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
# 针对IB接口,订阅行情需要货币和产品类型
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, gateway)
#tick = VtTickData() # 该tick实例可以用于缓存部分数据(目前未使用)
#self.tickDict[vtSymbol] = tick
self.tickSymbolSet.add(vtSymbol)
# 保存到配置字典中
if vtSymbol not in self.settingDict:
d = {
'symbol': symbol,
'gateway': gateway,
'tick': True
}
self.settingDict[vtSymbol] = d
else:
d = self.settingDict[vtSymbol]
d['tick'] = True
# 分钟线记录配置
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
symbol = setting[0]
gateway = setting[1]
vtSymbol = symbol
if symbol not in self.queue_dict:
self.queue_dict[symbol]=Queue()
req = VtSubscribeReq()
req.symbol = symbol
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, gateway)
# 保存到配置字典中
if vtSymbol not in self.settingDict:
d = {
'symbol': symbol,
'gateway': gateway,
'bar': True
}
self.settingDict[vtSymbol] = d
else:
d = self.settingDict[vtSymbol]
d['bar'] = True
# 创建BarManager对象
self.bmDict[vtSymbol] = BarManager(self.onBar)
# 主力合约记录配置
if 'active' in drSetting:
d = drSetting['active']
self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}
#----------------------------------------------------------------------
def getSetting(self):
"""获取配置"""
return self.settingDict, self.activeSymbolDict
#----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情事件"""
tick = event.dict_['data']
vtSymbol = tick.vtSymbol
# 生成datetime对象
if not tick.datetime:
tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
self.onTick(tick)
bm = self.bmDict.get(vtSymbol, None)
if bm:
bm.updateTick(tick)
#----------------------------------------------------------------------
def onTick(self, tick):
"""Tick更新"""
vtSymbol = tick.vtSymbol
if vtSymbol in self.tickSymbolSet:
self.insertData(TICK_DB_NAME, vtSymbol, tick)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(TICK_DB_NAME, activeSymbol, tick)
self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
time=tick.time,
last=tick.lastPrice,
bid=tick.bidPrice1,
ask=tick.askPrice1))
#----------------------------------------------------------------------
def onBar(self, bar):
"""分钟线更新"""
vtSymbol = bar.vtSymbol
self.insertData(MINUTE_DB_NAME, vtSymbol, bar)
# if vtSymbol in self.activeSymbolDict:
# activeSymbol = self.activeSymbolDict[vtSymbol]
# self.insertData(MINUTE_DB_NAME, activeSymbol, bar)
self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol,
time=bar.time,
open=bar.open,
high=bar.high,
low=bar.low,
close=bar.close))
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
#----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
symbol=data.vtSymbol
symbol_queue=self.queue_dict.get(symbol,None)
if symbol_queue:
symbol_queue.put((dbName, collectionName, data.__dict__))
#----------------------------------------------------------------------
def run(self,symbol):
"""运行插入线程"""
while self.active:
try:
single_queue=self.queue_dict.get(symbol,None)
if single_queue:
# sys.stdout.write(u"----{}----,队列大小为:{} \r".format(symbol,single_queue.qsize()))
# print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
dbName, collectionName, d =single_queue.get(block=True, timeout=1)
flt = {'datetime': d['datetime']}
# self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
self.mainEngine.dbInsert(dbName, collectionName, d)
#同样的数据插入主力连续合约数据集
if symbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[symbol]
# self.mainEngine.dbUpdate(dbName, activeSymbol, d, flt, True)
self.mainEngine.dbInsert(dbName, activeSymbol, d)
except Empty:
pass
#----------------------------------------------------------------------
def get_threadlist(self):
symbollist=self.queue_dict.keys()
for symbol in symbollist:
t=Thread(target=self.run,args=(symbol,))
self.threadlist.append(t)
def get_total_tasknumber(self):
total_size=0
symbollist=self.queue_dict.keys()
for symbol in symbollist:
total_size=total_size+self.queue_dict[symbol].qsize()
return total_size
def start(self):
"""启动"""
self.active = True
# self.thread.start()
for t in self.threadlist:
t.start()
#----------------------------------------------------------------------
def stop(self):
"""退出"""
if self.active:
self.active = False
self.thread.join()
#----------------------------------------------------------------------
def writeDrLog(self, content):
"""快速发出日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_DATARECORDER_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
作者:张国平 ;来源:维恩的派论坛
1.主要分析两个在类ctaTemplate的中的函数,onTrade和onOrder,其实两个很相似,被别的其他实例调用,推入更新的Trade和Order实例,并执行函数内的代码。对于Tick级别的交易,还是还是会经常用到这两个。下面是在ctaTemplate中的定义。
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略,可以忽略onOrder
pass
# ----------------------------------------------------------------------
def onTrade(self, trade):
"""收到成交推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略,可以忽略onOrder
pass
2.先去看看order和trade是什么样的类,两个都在vtObject.py里面。理论上来说,在tick级别中高频策略,当order和trade发生变化后,使用onOrder/onTrade 传递更新给策略;函数onOrder/onTrade里面一般定义一些对应不同状态进行的对应操作。
1) VtTradeData包含是成交的数据,其中最关键就是vtOrderID,可以和之前发送交易返回的vtOrderID做对应,用来对应的交易订单。其他诸如direction/offset/price/volume都是很重要;可以用来更新postion数据。
2) 类VtOrderData和之前VtQrderReq很像,但是不一样,这个是记录委托信息状态,req是交易请求,其中最关键的就是status,订单状态;这里有四个状态(ALLTRADED全部成交,PARTTRADED部分成交, NOTTRADED未成交,和CANCLLED拒单),这些属性在ctpGateway.py定义的。
class VtTradeData(VtBaseData):
"""成交数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtTradeData, self).__init__()
# 代码编号相关
self.symbol = EMPTY_STRING # 合约代码
self.exchange = EMPTY_STRING # 交易所代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码,通常是 合约代码.交易所代码
self.tradeID = EMPTY_STRING # 成交编号
self.vtTradeID = EMPTY_STRING # 成交在vt系统中的唯一编号,通常是 Gateway名.成交编号
self.orderID = EMPTY_STRING # 订单编号
self.vtOrderID = EMPTY_STRING # 订单在vt系统中的唯一编号,通常是 Gateway名.订单编号
# 成交相关
self.direction = EMPTY_UNICODE # 成交方向
self.offset = EMPTY_UNICODE # 成交开平仓
self.price = EMPTY_FLOAT # 成交价格
self.volume = EMPTY_INT # 成交数量
self.tradeTime = EMPTY_STRING # 成交时间
########################################################################
class VtOrderData(VtBaseData):
"""订单数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtOrderData, self).__init__()
# 代码编号相关
self.symbol = EMPTY_STRING # 合约代码
self.exchange = EMPTY_STRING # 交易所代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码,通常是 合约代码.交易所代码
self.orderID = EMPTY_STRING # 订单编号
self.vtOrderID = EMPTY_STRING # 订单在vt系统中的唯一编号,通常是 Gateway名.订单编号
# 报单相关
self.direction = EMPTY_UNICODE # 报单方向
self.offset = EMPTY_UNICODE # 报单开平仓
self.price = EMPTY_FLOAT # 报单价格
self.totalVolume = EMPTY_INT # 报单总数量
self.tradedVolume = EMPTY_INT # 报单成交数量
self.status = EMPTY_UNICODE # 报单状态
self.orderTime = EMPTY_STRING # 发单时间
self.cancelTime = EMPTY_STRING # 撤单时间
# CTP/LTS相关
self.frontID = EMPTY_INT # 前置机编号
self.sessionID = EMPTY_INT # 连接编号
3.之前提到数次通过onOrder/onTrade传递最新Order/Trade状态,这个负责处理的是一个系列过程,上层推手就是类ctaEngine,下面主要说下函数processOrderEvent,处理委托推送。其中传入的event是一个事件对象,由一个type_说明类型,和一个字典dict_存储具体的事件数据组成。可以理解为是上面vtObject的一个包装盒,eventEngine只要根据标签type_,就可以把具体数据传给对应的下层处理者。这个关于event具体的后面再分析。
这个函数,首先读取了event字典中包好的order,因为存在手动发起交易情况, 如果这个vtOrder是之前通过策略发出的,则调用callStrategyFunc来把这个order回传到对应strategy.onOrder方法,如果是手动发出指令就算了。同时也分析状态,如果在委托完成状态,也更新strategyOrderDict字典,移除这个。
def processOrderEvent(self, event):
"""处理委托推送"""
order = event.dict_['data']
vtOrderID = order.vtOrderID
if vtOrderID in self.orderStrategyDict:
strategy = self.orderStrategyDict[vtOrderID]
# 如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除
if order.status in self.STATUS_FINISHED:
s = self.strategyOrderDict[strategy.name]
if vtOrderID in s:
s.remove(vtOrderID)
self.callStrategyFunc(strategy, strategy.onOrder, order)
4.在往上追溯就到eventEngine,首先在ctaEngine初始化时候,会分配eventEngine实例,再通过下面代码注册处理事件,当某类事件收到时候,调用对应的方法,比如事件类型EVENT_ORDER, 对应的方法是self.processOrderEvent。
class ctaEngine
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.processTickEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
class eventEngine
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
在eventEngine中的register函数就是处理的方法通过__handlers字典来对应,__handlers是defaultdict(list),是一种特殊的字典,最大特点就是如果同一个key值插入不同value,他不会像就普通dict用新的替代,而且变成{key:[value1,value2,……]}这样存储。这样就可以让同一个type,可以有对应多个接收handler。
这里有两个eventEngine, 按照官方说法:
5.上面说了eventEngine的组成Event,然后还有一个后面处理函数def __process(self, event)。 在一个内部队列__queue中不停抓起event,通过检索字典__handlers来分配到对应的函数处理。那么谁放入新的event呢,就是一个调用put(event)函数向事件队列插入事件。这个时候发现一个特殊的EVENT_TIMER ,看了半天,感觉可以理解为是一个节奏控制器,每一秒去做一次process;那么对于高频来说,可能换成500毫秒更合适。
下面是VNPY定义的EVENT事件。
# 系统相关
EVENT_TIMER = 'eTimer' # 计时器事件,每隔1秒发送一次
EVENT_LOG = 'eLog' # 日志事件,全局通用
# Gateway相关
EVENT_TICK = 'eTick.' # TICK行情事件,可后接具体的vtSymbol
EVENT_TRADE = 'eTrade.' # 成交回报事件
EVENT_ORDER = 'eOrder.' # 报单回报事件
EVENT_POSITION = 'ePosition.' # 持仓回报事件
EVENT_ACCOUNT = 'eAccount.' # 账户回报事件
EVENT_CONTRACT = 'eContract.' # 合约基础信息回报事件
EVENT_ERROR = 'eError.' # 错误回报事件
6.现在想着是谁在不停的给这个内部队列放入order/trick状态的event呢, 而在ctpGate这个类中,在其父类vtGate中有onOrder方法,很规范的打包order到evet,然后放到队列里面。还有分析后发现在Mainengine对整个eventEngine进行管理,并通过addGateway通过中把在事件引擎和交易接口管理。
def onOrder(self, order):
"""订单变化推送"""
# 通用事件
event1 = Event(type_=EVENT_ORDER)
event1.dict_['data'] = order
self.eventEngine.put(event1)
# 特定订单编号的事件
event2 = Event(type_=EVENT_ORDER+order.vtOrderID)
event2.dict_['data'] = order
self.eventEngine.put(event2)
7.在至上是class CtpTdApi(TdApi)这个类的,读取data中的order相关数据,创建order,推送到上面的这个onOrder里面; 在往上就有点头大了,这个data信息应该是从编译底层返回的。
def onRtnOrder(self, data):
"""报单回报"""
# 更新最大报单编号
newref = data['OrderRef']
self.orderRef = max(self.orderRef, int(newref))
# 创建报单数据对象
order = VtOrderData()
order.gatewayName = self.gatewayName
# 保存代码和报单号
order.symbol = data['InstrumentID']
order.exchange = exchangeMapReverse[data['ExchangeID']]
order.vtSymbol = order.symbol #'.'.join([order.symbol, order.exchange])
order.orderID = data['OrderRef']
# CTP的报单号一致性维护需要基于frontID, sessionID, orderID三个字段
# 但在本接口设计中,已经考虑了CTP的OrderRef的自增性,避免重复
# 唯一可能出现OrderRef重复的情况是多处登录并在非常接近的时间内(几乎同时发单)
# 考虑到VtTrader的应用场景,认为以上情况不会构成问题
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.direction = directionMapReverse.get(data['Direction'], DIRECTION_UNKNOWN)
order.offset = offsetMapReverse.get(data['CombOffsetFlag'], OFFSET_UNKNOWN)
order.status = statusMapReverse.get(data['OrderStatus'], STATUS_UNKNOWN)
# 价格、报单量等数值
order.price = data['LimitPrice']
order.totalVolume = data['VolumeTotalOriginal']
order.tradedVolume = data['VolumeTraded']
order.orderTime = data['InsertTime']
order.cancelTime = data['CancelTime']
order.frontID = data['FrontID']
order.sessionID = data['SessionID']
# 推送
self.gateway.onOrder(order)
总体来看,eventEngine这个是一个总的驱动,在内部queue这个传送带,分发做了字典里面类型标记的Event实例给对应的处理对象;ctpGateway这个通过put把新的event放入queue中。
vn.py实现了2个维度上的单位头寸限制,分别是单品种头寸上限是4,单个方向整体头寸上限是10。那么现在测试一下适用于国内期货品种的新海龟策略,其最优单位头寸限制数值是否与原版海龟策略一致。
实现步骤:
在回测中修改单位头寸设置:打开海龟策略文件turtleStrategy.py找到第10、第11行代码,直接修改其数值即可。
MAX_PRODUCT_POS = 4 # 单品种最大持仓
MAX_DIRECTION_POS = 10 # 单方向最大持仓
基于该分类一共得到6个测试组合,其回测效果如图所示。
这六个备选组合,按照从左到右,从高到低顺序,其夏普比率分别是1.68、1.68、1.6、1.5、1.44、1.4。可以观察到,其数据变化呈现倒“U”型,在单个方向整体头寸上限为9时,夏普比率达到顶峰,然后不管是头寸上限增加还是降低,夏普比率都会下降。
另外,从单方向头寸从14增大至无穷大(即限制取消)时,夏普比率降低得并不严重,仅仅为0.04,这说明针对国内期货品种的海龟策略,其自然达到的单方向头寸单位是大于14的,但是其差距并不是很大。
然后对单品种头寸上限为4的类型进行测试,其回测效果如图所示。
这六个备选组合,按照同样的观察顺序,其夏普比率分别是1.67、1.69、1.65、1.61、1.61、1.61。其夏普比率变化同样呈现倒“U”型,但是较单位头寸上限为3的更加扁平。在单个方向整体头寸上限为9时,夏普比率达到顶峰,然后不管是头寸上限增加还是降低,夏普比率都会下降。
另外,从单方向头寸从14增大至无穷大(即限制取消)时,夏普比率降低了0.14。
最后对单品种头寸上限为5的类型进行测试,其回测效果如图所示。
可以观察到,分类3的所有组合与分类2的完全一致,这表明单品种头寸单位自然到达的上限是4,再往上增加其限制已无任何意义。
综上所述,基于海龟组合,单品种头寸单位上限为4,单个方向整体头寸为9时效果最优,其最终版本的海龟策略夏普比率达到1.69。
作者:爱茶语 ;来源:维恩的派论坛
1:在ctpGateway.py里面的def onRspQryTradingAccount(self, data, error, n, last):下面加上
import csv
import os
import datetime
from time import sleep
#通过CTP接口查询账户资金
vnTrader_dir = 'C:\ProgramData\\Anaconda2\\Lib\\site-packages\\vnpy-1.7.1-py2.7.egg\\vnpy\\trader\\app\\ctaStrategy\\AccountInfo'# AccountInfo 所在路径(这是我放CSV的路径,大家自行修改)
today = datetime.datetime.now().strftime("%Y-%m-%d")
# 文件名称设置为今天名称, 每次只推送一条合约信息
path = vnTrader_dir + '\\AccountInfo_' + today + '.csv'
if not os.path.exists(path): # 如果文件不存在,需要写header
with open(path, 'wb') as f: # 用wb读不会产生\r\n的换行问题
w = csv.DictWriter(f, data.keys())
w.writeheader()
w.writerow(data)
else: # 文件存在,不需要写header
with open(path,'ab') as f: #二进制追加形式写入
w = csv.DictWriter(f, data.keys())
w.writerow(data)
sleep(60)
#return #每天只查询一次,文件存在不写入
2.在策略里面加上 """通过csv读取账户可用资金"""
import csv
import datetime
import time
today = datetime.datetime.now().strftime("%Y-%m-%d")
csv_file = file(r'C:\\ProgramData\\Anaconda2\\Lib\\site-packages\\vnpy-1.7.1- py2.7.egg\\vnpy\\trader\\app\\ctaStrategy\\AccountInfo\\AccountInfo_'+today+'.csv', 'rb')
reader = csv.DictReader(csv_file)
Available = float([row['Available'] for row in reader ][-1])
3.在需要调仓的XBAR周期下面加上self.open_pos = int(self.Available/(self.bar.close100.15)*0.1) #螺纹钢为例,每手乘数10,保证金按15%算,10%仓位
理论上,基于日线数据的中低频趋势跟踪策略,手续费和滑点是可以忽略不计的。为了验证这个结论,进行对比测试,如图所示。
右图为包含手续费和滑点版本的海龟策略,其中手续费设置为交易所手续费的1.1倍,滑点设置为期货合约的最小价格变动。
其结果是总手续费为635364,总滑点为636794,加上手续费和滑点后,其影响在于:
综上所述,手续费和滑点对于海龟策略影响不大,在测试中可以忽略不计。
作者:爱茶语 ;来源:维恩的派论坛
open_pos = 20 #交易手数
#拆单追价参数
disassembleSingle = 10 #拆单下单量阈值
disassembleTimeTotal = 15 #拆单总时间15秒
disassembleInterval = 5 #拆单时间间隔5秒,拆单次数为disassembleTimeTotal/disassembleInterval
chaseInterval = 10 #未成交追单时间间隔10秒 def __init__(self, ctaEngine, setting):
"""Constructor"""
super(APADXTICKStrategy, self).__init__(ctaEngine, setting)
"""
如果是多合约实例的话,变量需要放在__init__里面
"""
# 策略变量
self.bar = None # K线对象
self.barMinute = EMPTY_STRING # K线当前的分钟
self.minutebar = None # minuteK线对象
self.bufferCount = 0 # 目前已经缓存了的数据的计数
self.barList = []
self.order = {} #委托订单号
self.order_second = None #当前委托秒数
self.XMhighArray = np.zeros(self.bufferSize) # X分钟K线最高价的数组
self.XMlowArray = np.zeros(self.bufferSize) # X分钟K线最低价的数组
self.XMcloseArray = np.zeros(self.bufferSize) # X分钟K线收盘价的数组
self.XMopenArray = np.zeros(self.bufferSize) # X分钟K线开盘价的数组
self.dayCloseArray = np.zeros(self.bufferSize) # 日线收盘价的数组
self.dayHighArray = np.zeros(self.bufferSize) # 日线最高价的数组
self.dayLowArray = np.zeros(self.bufferSize) # 日线最低价的数组
self.LongEnterable1 = False
self.ShortEnterable1 = False
self.SellEnterable1 = False
self.CoverEnterable1 = False
self.longTriger = False
self.shortTriger = False
self.sellTriger = False
self.coverTriger = False
self.chaselongTriger = False
self.chasesellTriger = False
self.chaseshortTriger = False
self.chasecoverTriger = False
self.orderFinished = False
#指标初始化
self.lastPrice = EMPTY_INT
self.buyPrice = EMPTY_INT
self.shortPrice = EMPTY_INT
self.dayClose = EMPTY_INT
self.dayHigh = EMPTY_INT
self.dayLow = EMPTY_INT
self.tickSecond = EMPTY_INT
self.sellPrice = EMPTY_INT
self.coverPrice = EMPTY_INT
self.adx = EMPTY_INT
self.longtradeVolume = 0
self.shorttradeVolume = 0
self.selltradeVolume = 0
self.covertradeVolume = 0
def onInit(self):
self.writeCtaLog('{}策略初始化'.format(self.name))
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadTick(self.initDays)
for Tick in initData:
self.onTick(Tick)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog('{}策略启动'.format(self.name))
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog('{}策略停止'.format(self.name))
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送(必须由用户继承实现)"""
self.tickSecond = tick.datetime.second
self.lastPrice = tick.lastPrice
#委托单全部成交或撤销才能发开仓单
if not self.order:
self.OrderFinished = True
elif self.order['status'] == '未成交' or self.order['status'] == '部分成交':
self.OrderFinished = False
else:
self.OrderFinished = True
#拆单开仓模块
if self.LongEnterable1 and tick.lastPrice >= self.buyPrice and self.pos >= 0:
#如果开仓仓位大于拆单阈值disassembleSingle,分intself.disassembleTimeTotal/self.disassembleInterval次数完成开仓,间隔disassembleInterval秒
#self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)) 开仓手数减去当前仓位小于拆单手数时只发开仓手数减去当前仓位的单子,防止总开仓手数大于self.opne_pos
if self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) > int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
self.longTriger =False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.longTriger) and (self.orderFinished):
self.buy(tick.bidPrice1,int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)))
self.longTriger =True
elif self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
self.longTriger =False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.longTriger) and (self.orderFinished):
self.buy(tick.bidPrice1,int(self.open_pos - abs(self.pos)))
self.longTriger =True
elif abs(self.pos) ==0 and self.open_pos <= self.disassembleSingle and (not self.longTriger) and (self.orderFinished):
self.buy(tick.bidPrice1,self.open_pos)
self.longTriger = True
if self.ShortEnterable1 and tick.lastPrice <= self.shortPrice and self.pos <= 0:
if self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) > int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
self.shortTriger =False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.shortTriger) and (self.orderFinished):
self.short(tick.askPrice1,int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)))
self.shortTriger =True
elif self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
self.shortTriger =False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.shortTriger) and (self.orderFinished):
self.short(tick.askPrice1,int(self.open_pos - abs(self.pos)))
self.shortTriger =True
elif abs(self.pos) ==0 and self.open_pos <= self.disassembleSingle and (not self.shortTriger) and (self.orderFinished):
self.short(tick.askPrice1,self.open_pos)
self.shortTriger = True
#拆单平仓模块
if copy.deepcopy(self.SellEnterable1) and self.pos > 0:
if abs(self.pos) > self.disassembleSingle:
self.sellTriger=False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.sellTriger) and (self.orderFinished):
self.sell(tick.askPrice1,int(abs(self.pos)/(self.disassembleTimeTotal/self.disassembleInterval)))
self.sellTriger = True
elif abs(self.pos) <= self.disassembleSingle and (not self.sellTriger) and (self.orderFinished):
self.sell(tick.askPrice1,abs(self.pos))
self.sellTriger = True
elif copy.deepcopy(self.CoverEnterable1) and self.pos < 0:
if abs(self.pos) > self.disassembleSingle:
self.coverTriger=False
if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.coverTriger) and (self.orderFinished):
self.cover(tick.bidPrice1,int(abs(self.pos)/(self.disassembleTimeTotal/self.disassembleInterval)))
self.coverTriger = True
elif abs(self.pos) <= self.disassembleSingle and (not self.coverTriger) and (self.orderFinished):
self.cover(tick.bidPrice1,abs(self.pos))
self.coverTriger = True
#委托超过chaseInterval秒未成交撤单追价
if not self.order:
pass
elif self.order['direction'] == '多' and self.order['offset'] == '开仓' and self.order['status'] == '未成交':
if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
self.longtradeVolume = self.order['totalVolume']
if self.longtradeVolume > 0 and (not self.chaselongTriger):
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
self.cancelAll()
self.chaselongTriger = True
elif self.order['direction'] == '空' and (self.order['offset'] == '平仓' or self.order['offset'] == '平今') and self.order['status'] == '未成交':
if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
self.selltradeVolume = self.order['totalVolume']
if self.selltradeVolume > 0 and (not self.chasesellTriger):
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
self.cancelAll()
self.chasesellTriger = True
elif self.order['direction'] == '空' and self.order['offset'] == '开仓' and self.order['status'] == '未成交':
if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
self.shorttradeVolume = self.order['totalVolume']
if self.shorttradeVolume > 0 and (not self.chaseshortTriger):
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
self.cancelAll()
self.chaseshortTriger = True
elif self.order['direction'] == '多' and (self.order['offset'] == '平仓' or self.order['offset'] == '平今') and self.order['status'] == '未成交':
if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
self.covertradeVolume = self.order['totalVolume']
if self.covertradeVolume > 0 and (not self.chasecoverTriger):
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
self.cancelAll()
self.chasecoverTriger =True
#self.chaseInterval+1秒追价,避免追加单被撤
if self.chaselongTriger:
if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
self.buy(tick.askPrice1,self.longtradeVolume)
self.chaselongTriger = False
if self.chasesellTriger:
if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
self.sell(tick.bidPrice1,self.selltradeVolume)
self.chasesellTriger = False
if self.chaseshortTriger:
if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
self.short(tick.bidPrice1,self.shorttradeVolume)
self.chaseshortTriger = False
if self.chasecoverTriger:
if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
self.cover(tick.askPrice1,self.covertradeVolume)
self.chasecoverTriger = False
#成交触发后,触发器设置为未触发
if self.pos == 0:
self.CoverEnterable1 = False
self.SellEnterable1 = False
self.coverTriger = False
self.sellTriger = False
if abs(self.pos) !=0:
self.longTriger = False
self.shortTriger = False
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
self.order = order.__dict__
上一笔盈利过滤的意思是,若上一次交易为盈利的,则当前的交易信号无效,即当前不进行交易。
基于20日唐奇安通道,费思认为若上一次突破是盈利的,那么新突破点可能离当前价格,因为有可能跑到55日突破点上面去;若上一次突破点亏损,那么新突破点将更加接近当前价格。
若基于传统日内中高频CTA策略的视角,这是非常主观的解释:费思认为这几乎不可能连续2次出现大行情,但是事实是分钟级别的CTA策略有可能出现2次大行情,故首次盈利过滤的作用仅仅是节省手续费和滑点,但是错过了潜在的巨大收益。
但是呢,基于日线级别中低频CTA策略,其首次盈利过滤是否有效还需要进行验证。
本次测试的基准是剔除长周期版本的“新”海龟策略,海龟组合是在上面的章节中挑选的投资组合(回望周期为3年;筛选标准是回归夏普比率>0.6),然后把海龟策略分为含义上一笔盈利过滤和不含上一笔盈利过滤这两个版本,测试效果如图所示。
从上一笔盈利对比图中可以看出:
增加过滤器后,总成交笔数降低了,年化收益和夏普比率都提高了,因此可以推断海龟策略独有的上一笔盈利过滤是对于日线级别的趋势跟踪策略是非常有效的,其作用在于降低无效交易(即假突破)所造成的亏损,在整体上提高的策略的胜率。
(该结论在统计学来看,就是在日线级别中,连续出现2次大行情的概率极其低,应对方案就是剔除低概率事件,从而提高整体的胜率。)
作者:viponedream ;来源:维恩的派论坛
针对懒人,打算挂上去就不管它,让它自己运行,一个星期,一个月,甚至更久。
目前想到的就是几个地方要改一下,欢迎大家探讨怎样用vnpy来做隔夜。
一,定时连接CTP我采用的死方法。在uiMainWindow.py中的 updateStatusBar中,增加时间判断,到点了自动重连CTP。
# 计时器,
# 晚上21:00是夜盘时间,提前20分钟连接 CTP
dt = datetime.now()
if dt.hour == 20 or dt.hour == 8:
if dt.minute == 40 and dt.second == 0:
self.connectCtp()
二,历史持仓,即以前的老仓位。策略一开始要不要处理老仓位?目前是不管的,只管策略自己开的那些仓位。
这样中途停机造成问题。所以还是把老仓位一起赋给策略吧。
有人是把历史仓位存在一个文件中。我觉得没什么必要(不是单品种多策略的话)。
直接在策略中onPosition中读取历史持仓了。(要先在ctaEngine中增加一个处理函数,把onPosition推到相应的策略去)
def onPosition(self, pos):
# 更新仓位,把手动的开平仓同步更新到策略中。这样,就只能一个品种用一个策略了。
# 有历史仓位则在策略开始时,把历史仓位赋给策略仓位。没有历史仓位则不必。
if self.isPrePosHaved or self.isAlreadyTraded: # 还没有开过仓,或,还没有获取历史仓位
return
elif pos.position != 0:
if pos.direction == DIRECTION_LONG:
self.pos = pos.position
else:
self.pos = -pos.position
self.lastEntryPrice = pos.price
self.isPrePosHaved = True
print (u'{0} {1} 历史持仓 {2} 开仓均价 {3}'.format(datetime.now(), self.vtSymbol, self.pos, pos.price ))
pass
三,今仓,昨仓重置。
原来的系统是日内的,碰到换日就会出错。所以要重置一下。
这个要修改两个地方。
先在 CtpTdApi的 init 中加上
self.posBufferDict = {} # 缓存持仓数据的字典
self.symbolExchangeDict = {} # 保存合约代码和交易所的印射关系
self.symbolSizeDict = {} # 保存合约代码和合约大小的印射关系
# 加上下面这一行
self.posBufferDictShfe = {} # 缓存SHFE持仓数据的字典1,是在CTPGATEWAY.PY 修改这个回调。
def onRspQryInvestorPosition(self, data, error, n, last):
"""持仓查询回报"""
#print datetime.now(),u'持仓回报', data
# 更新持仓缓存,并获取VT系统中持仓对象的返回值
exchange = self.symbolExchangeDict.get(data['InstrumentID'], EXCHANGE_UNKNOWN)
size = self.symbolSizeDict.get(data['InstrumentID'], 1)
# 获取缓存字典中的持仓缓存,若无则创建并初始化
positionName = '.'.join([data['InstrumentID'], data['PosiDirection']])
if exchange == EXCHANGE_SHFE: #上期所的专门一个 dict
if positionName in self.posBufferDictShfe:
posBuffer = self.posBufferDictShfe[positionName]
else:
posBuffer = PositionBuffer(data, self.gatewayName)
self.posBufferDictShfe[positionName] = posBuffer
else: # 其它交易所
if positionName in self.posBufferDict:
posBuffer = self.posBufferDict[positionName]
else:
posBuffer = PositionBuffer(data, self.gatewayName)
self.posBufferDict[positionName] = posBuffer
# 缓存持仓信息
if exchange == EXCHANGE_SHFE:
posBuffer.updateShfeBuffer(data, size) # 不必 return
if last: # 上期所的只有最后一个才全部返回持仓信息
for positionName in self.posBufferDictShfe:
pos = self.posBufferDictShfe[positionName].pos
self.gateway.onPosition(pos)
self.posBufferDictShfe = {} # 返回后就重置
else: # 其它交易所直接返回
pos = posBuffer.updateBuffer(data, size)
self.gateway.onPosition(pos)
2,在ctaEngine
增加两个事件处理函数
#----------------------------------------------------------------------
def registerEvent(self):
# 加上以下两个
self.eventEngine.register(EVENT_POSITION, self.processExchangePositionEvent)
self.eventEngine.register(EVENT_TIMER, self.onTimer)
def onTimer(self, event):
dt = datetime.now()
if dt.hour == 21 and dt.minute==0 and dt.second==1: # 夜盘开盘第一时间重置
self.posBufferDict = {} # 清空则默认使用平昨
if self.tickStrategyDict:
for key in self.tickStrategyDict:
for strategy in self.tickStrategyDict[key]:
strategy.onTimer()
pass
def processExchangePositionEvent(self, event):
pos = event.dict_['data']
if pos.vtSymbol in self.tickStrategyDict:
for strategy in self.tickStrategyDict[pos.vtSymbol]:
if pos.direction == DIRECTION_LONG:
strategy.exchangePos = pos.position
else:
strategy.exchangePos = -pos.position
strategy.onPosition(pos)
以上修改,策略已经可以连续跑起来了。强调的是:一个品种只能一个策略,并且只能单方向,不能同时有多和有空。# 涉及到的变量,自己加的。
self.isPrePosHaved self.isAlreadyTraded 自己加在策略上或者ctaTemplate都行。
onTimer()也自己加上的。