VeighNa量化社区
你的开源社区量化交易平台
KeKe's Avatar
Member
离线
187 帖子
声望: 51

BarGenerator里面的update_tick()用于把tick数据合成1分钟K线数据,update_bar()是用于把1分钟数据合成X分钟数据。
可以参考2.0版本cta_strategy里面的布林带策略示例,那里提供1分钟K线合成为15分钟K线,并且基于15分钟K线来产生买卖信号

目前v1.9版本已经移除OANDA接口。

能说说为什么要用OANDA来做交易吗?

以后可以考虑重新添加OANDA到v1.9和2.0版本

请提供一下报错的截图

既然有回测引擎,可以按照1.9版本来尝试写一个用于策略测试的文件。
 
下面提供一下示例:
enter image description here

这1套仅仅用于盘中测试,若是在收盘以后测试会导致这种情况。
所以在盘外可以用第二套地址测试:交易前置:180.168.146.187:10030,行情前置:180.168.146.187:10031;

海龟策略的信号来源于唐奇安通道突破。简单的说就是若突破上轨则做多,突破下轨则做空。我们可以对信号进行改良,如换成布林带通道,金肯特纳通道等等,并且增加过滤条件和离场条件。

但是呢?新的问题又来了:若用了新的指标,需要通过不断调试来得到“最优”参数,这样会耗费大量的时间。

那么,有没有办法在尽量少得时间内,尽可能得到全局最优解或者次优解呢?

答案就是遗传算法啦!

 
 
 

遗传算法原理


具体原理详见:
遗传算法原理简介

一文读懂遗传算法工作原理(附Python实现)

 

遗传算法要做的事情并不复杂:

  1. 随机生成一大推策略参数(称之为族群,族群内的个体对应某一组策略参数)
  2. 族群内个体间进行3类活动:个体间两两交叉互换;个体某个参数发生变异;个体繁殖(即直接复制参数)
  3. 形成子代
  4. 通过目标函数(如最大化夏普比率,最大化总盈亏等)对母代族群和子代族群进行评分
  5. 通过特点筛选标准(如NSGA-Ⅱ)从母代和子代中筛选个体,形成第二代族群。(类似进化论中的“自然选择”)
  6. 新的族群在特定的评分标准和筛选标准中不断迭代(即重复1-5步骤),得到最优解/次优解。

 
 
 

遗传算法代码示例


1.随机生成待优化的策略参数

def parameter_generate():
    '''
    根据设置的起始值,终止值和步进,随机生成待优化的策略参数
    '''
    parameter_list = []
    p1 = random.randrange(4,50,2)      #入场窗口
    p2 = random.randrange(4,50,2)      #出场窗口
    p3 = random.randrange(4,50,2)      #基于ATR窗口止损窗
    p4 = random.randrange(18,40,2)     #基于ATR的动态调仓 

    parameter_list.append(p1)
    parameter_list.append(p2)
    parameter_list.append(p3)
    parameter_list.append(p4)

    return parameter_list

 

  1. 设置目标优化函数(收益回撤比和夏普比率)
def object_func(strategy_avg):
    """
    本函数为优化目标函数,根据随机生成的策略参数,运行回测后自动返回2个结果指标:收益回撤比和夏普比率
    """
    # 创建回测引擎对象
    engine = BacktestingEngine()
    # 设置回测使用的数据                       
    engine.setBacktestingMode(engine.BAR_MODE)      # 设置引擎的回测模式为K线
    engine.setDatabase("VnTrader_Daily_Db", 'XBTHOUR')  # 设置使用的历史数据库
    engine.setStartDate('20170401')                 # 设置回测用的数据起始日期
    engine.setEndDate('20181230')                   # 设置回测用的数据起始日期

    # 配置回测引擎参数
    engine.setSlippage(0.5)                        
    engine.setRate(0.2/100)                     
    engine.setSize(10)                            
    engine.setPriceTick(0.5)                      
    engine.setCapital(1000000) 

    setting = {'entryWindow': strategy_avg[0],       #布林带窗口
               'exitWindow': strategy_avg[1],        #布林带通道阈值
               'atrWindow': strategy_avg[2],         #CCI窗口
               'artWindowUnit': strategy_avg[3],}    #ATR窗口               

    #加载策略          
    engine.initStrategy(TurtleTradingStrategy, setting)    
    # 运行回测,返回指定的结果指标   
    engine.runBacktesting()          # 运行回测
    #逐日回测   
    engine.calculateDailyResult()
    backresult = engine.calculateDailyStatistics()[1] 

    returnDrawdownRatio = round(backresult['returnDrawdownRatio'],2)  #收益回撤比
    sharpeRatio= round(backresult['sharpeRatio'],2)                   #夏普比率
    return returnDrawdownRatio , sharpeRatio

 

3.运行基于Deap库的遗传算法(具体步骤看代码中文注释)

#设置优化方向:最大化收益回撤比,最大化夏普比率
creator.create("FitnessMulti", base.Fitness, weights=(1.0, 1.0)) # 1.0 求最大值;-1.0 求最小值
creator.create("Individual", list, fitness=creator.FitnessMulti)

def optimize():
    """"""   
    toolbox = base.Toolbox()  #Toolbox是deap库内置的工具箱,里面包含遗传算法中所用到的各种函数

    # 初始化     
    toolbox.register("individual", tools.initIterate, creator.Individual,parameter_generate) # 注册个体:随机生成的策略参数parameter_generate()                                          
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)               #注册种群:个体形成种群                                    
    toolbox.register("mate", tools.cxTwoPoint)                                               #注册交叉:两点交叉  
    toolbox.register("mutate", tools.mutUniformInt,low = 4,up = 40,indpb=0.6)                #注册变异:随机生成一定区间内的整数
    toolbox.register("evaluate", object_func)                                                #注册评估:优化目标函数object_func()    
    toolbox.register("select", tools.selNSGA2)                                               #注册选择:NSGA-II(带精英策略的非支配排序的遗传算法)


    #遗传算法参数设置
    MU = 40                                  #设置每一代选择的个体数
    LAMBDA = 160                             #设置每一代产生的子女数
    pop = toolbox.population(400)            #设置族群里面的个体数量
    CXPB, MUTPB, NGEN = 0.5, 0.35, 40        #分别为种群内部个体的交叉概率、变异概率、产生种群代数
    hof = tools.ParetoFront()                #解的集合:帕累托前沿(非占优最优集)

    #解的集合的描述统计信息
    #集合内平均值,标准差,最小值,最大值可以体现集合的收敛程度
    #收敛程度低可以增加算法的迭代次数
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    np.set_printoptions(suppress=True)            #对numpy默认输出的科学计数法转换
    stats.register("mean", np.mean, axis=0)       #统计目标优化函数结果的平均值
    stats.register("std", np.std, axis=0)         #统计目标优化函数结果的标准差
    stats.register("min", np.min, axis=0)         #统计目标优化函数结果的最小值
    stats.register("max", np.max, axis=0)         #统计目标优化函数结果的最大值

    #运行算法
    algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats,
                              halloffame=hof)     #esMuPlusLambda是一种基于(μ+λ)选择策略的多目标优化分段遗传算法

    return pop

 
 
 

遗传算法效果


夏普比率1.9,总收益率1958%,最大百分比回撤37%,收益回撤比达53。

enter image description here
其解集收敛程度如下:
enter image description here

在得到一个好的曲线后,还要检查一下这些参数是否符合市场逻辑,尽量去避免过拟合的情况。下面举个反例:

这里使用金肯特纳通道+基于固定百分比移动止损策略。不管从曲线的形态和收敛程度来看都是挺正常的

enter image description here
enter image description here
但是在这种优化后参数中我们观察到trailingPercent=18%,这意味着价格从最高点回落18个点才平仓离场。在正常情况,这会带来非常糟糕的盈亏比。
enter image description here

 
 
 

总结

遗传算法本质上是一种加快策略研究的技术,相对于暴力穷举,它可以大大节省电脑运算时间。我们可以使用它,但不能过度依赖它,因为有可能输出的仅仅是一些很巧合的参数。所以,针对这些参数,需要做更加细致的回测。

毕竟,在策略研究中,细心与耐心也是非常重要的。

策略相关


K线合成器

Q:CTA策略默认传入 1min K 线数据,在哪个函数中传入默认参数 1min?
A:BarGenerator里面的update_tick()用于把tick数据合成1分钟K线数据,update_bar()是用于把1分钟数据合成X分钟数据。可以参考布林带策略示例,那里提供1分钟K线合成为15分钟K线,并且基于15分钟K线来产生买卖信号。

Q:BarGenerator的最大值是不是只能60?
A:合成分钟线的时候,周期最大只能为60(基于60分钟整除来进行N分钟切分);合成小时线的时候,周期可以为任意值(基于多少个小时过去了,进行合成)

 

K线时间序列管理器

Q:ArrayManager的初始化函数默认size=100,size指的是什么?
A:size是指这个K线时间序列容器缓存的数据量的大小,理论上只要超过了策略中要计算的所有技术指标最长的那个周期,就够用了。比如你要算MA20 RSI14 CCI50,那么最少需要size=50,否则CCI计算的数据量就不够,一般情况下还会在size上加上一定的量,来避免talib中某些指标算法可能需要更长的数据,保证计算的正确性。

Q:策略使用5分钟K线,ArrayManager初始化 self.am = ArrayManager(100)。在初始化size这里输入100的话,请问vn.py会从数据库里面提取100根1分钟的bar还是500根1分钟的bar来初始化指标?
A:缓存到100跟5分钟K线后才会完成初始化状态。

Q:talib安装失败,怎么解决?
A:使用手动安装:
1.进入Unofficial Windows Binaries for Python Extension Packages中找到talib对应的版本(如py3.7,64位)
2.下载对应版本的文件TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl
3.下载好whl文件之后,直接在命令行下安装文件即可,如下。成功后会显示“Sucessful installed TA_Lib?0.4.17”

pip install TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl

Q:如何导出计算的技术指标数值
A:最简单的方法就是直接print了,或者也可以写入到文件里。

 

回测数据

Q:CTA回测组件提示K线已经下载完成,但是在sqlite数据库中查看不到记录。
A:数据被插入到当前用户目录的database.db中,如C:\Users\HOME\.vntrader\database.db

Q:实盘时vn.py数据是tick级数据推送,回测时数据是分钟级的,它会识别时间戳吗?
A:bar回测的模式,在策略内部将tick合成为bar或者将1分钟bar合成为x分钟的bar

Q:回测加载数据,显示载入数据量为0。
A:没有连接数据库,或者数据库无数据

Q:初始化策略,默认initDays=10,改成初始一定数量的分钟数是不是更加合理?
A:若载入充足的历史数据,就可以立刻交易了。

Q:数据商也提供逐笔数据如何用来回测?
A:用逐笔自行还原出完整的订单簿tick数据,所以用tick模式来回测。

Q:如何获取A股分钟线数据?
A:目前没有免费的下载渠道(交易所禁止),推荐通过米筐的RQData获取。

Q:如何更改数据库存放盘?
A:举个例子:在D盘创建目录D:\test;在D:\test下创建文件夹.vntrader;使用VN Station的VN Trader Pro切换到D:\test目录启动;此时启动的VN Trader运行时目录已经是D:\test(可以在标题栏看到)

 

参数优化
Q:用capital作为目标,输出结果全是0
A:因为在逐日统计回测中,capital代表的是起始资金,无法作为优化的目标。可以改用endBalance,sharpeRatio,maxDrawdown等目标优化参数试一下。

Q:engine.trades可以输出交易记录,但是记录里只有time,没有具体的date,请问哪里可以输出具体的交易日期和时间?
A:trades记录里的trade对象,有个额外的datetime字段是用来标识该成交的日期时间的。

 

实盘运行

Q:自定义策略需要放在哪里?
A:在当前运行的脚本目录先创建创建strategies文件夹,然后把策略文件放进去就可以了。 配置文件可以创建.vntrader, 然后把配置文件放进去。

Q:怎样在on_bar里output数据,以此检查策略是不是按照我的想法在运转
A:可以直接在策略初始化的时候打开一个txt文件句柄,然后回测过程中随时往里面写记录

Q:点击CTA策略出现了json错误
A:json读取错误,如把.vntrader下的cta_strategy_data.json删除可解决

Q:CTA模块中是否有自带的变量存储合约的最小变动价位数据?
A:策略发出的买卖指令的价格,会自动根据最小价格变动pricetick进行round取整,无需最小价格变动的数据了

Q:如何在无人值守脚本获得所有合约的信息?
A:调用main_engine.get_all_contracts函数

Q:用CTPtest抓所有合约, 郑交所的合同没有菜粕,苹果。
A:调用get_all_contracts前,sleep等待5秒,接收合约数据推送需要时间;某些测试环境里,是会缺少部分合约的。

Q:为什么使用VnTrader的cta策略组件进行日常实盘交易时,每天交易时段结束之后,一定要把VnTrader关闭,然后在下次开盘前15分钟在重启并初始化策略的参数?
A:关闭交易系统主要是为了清空系统内部(接口层、引擎层)的缓存状态,策略层的缓存状态一般倒是不太容易出问题的(除非你逻辑写错了)

Q:请问backtesting里的cross_limit_order和cross_stop_order什么意思
A:撮合限价单委托,撮合本地停止单(条件单)委托。讲最新的行情K线或者TICK和策略之前下达的所有委托进行检查,如果能够撮合成交,则返回并记录数据

Q:如何实现k线内成交?
A:用的是本地停止单
就比如当前价格是100点,策略发出信号,在下一根K线的120线发出买入:
若价格没突破到120点,继续挂着。
若价格从100涨到130,那么在120点的时候停止单变成市价单追上去保证尽可能成交。
若下一根K线的开盘价大于120,那么以开盘价来立刻成交。

Q:回测1h k线数据, 为什么每一笔订单成交记录都比委托记录晚一个小时 , 请问这个是在哪里设置的?
A:委托时间,是你发出委托请求的时间。成交时间,是你的成交发生的事件。CTA策略回测遵循T时刻发出的委托,永远只在T+1后才能进行撮合的规则(否则会有未来函数)。
实盘中,你的成交时间可能和委托时间非常接近,但是回测中受限于数据没法确定,只能用T+1那根K线的时间戳近似。

Q:如果停止单触发下单之后一段时间没有执行的话,会撤单吗,什么时候撤单,撤单之后还会追单吗?
A:不会,除非策略执行撤单操作。

Q:如果停止单触发下单之后,部分成交,接下来会撤单还是追单?
A:策略会收到这笔委托的回报,用户可以自行处理,不会自动撤单或者追单

Q:如果同时持有多空,pos是单向的,该怎么处理? 例如:如果持有1手多单,sell卖平的委托没有成交,紧接着的short卖开的单子成交了,pos 是多少?
A : 对于CTA策略模块来说,策略的持仓,是基于策略本身发出去的委托成交维护的逻辑持仓(而非账户底层的实际持仓),所以pos会为0

Q:假如策略下了1手多单,手动下了1手多单,pos 是多少 ?
A : 人手工下的单子无影响

Q:假如策略下了2手多单,手动平仓1手,pos 是多少 ?
A:人手工下的单子无影响

Q:两个策略都在跑同一个代码,应该是各自有各自的pos 吧 ?
A:对的,这两个逻辑持仓互相无关

Q:框架对pos值的更新,是在onTrade 和 onOrder推送动作前,还是推送动作后?
A:onTrade推送前更新,保证策略收到onTrade回调的时候,pos已经是最新数据。

Q:vt_setting.json的路径到底在哪?
A:在c:\users\administrator.vntrader目录下,是基于Python的pathlib实现的

Q:positionData中的 yd_volume 是指什么?
A:昨仓,这个主要针对中国市场的股票(只能卖出昨日的股票)和期货(昨仓平仓手续费不同)

Q:当前CTA模块是不是不能在策略里获取当前资金情况?
A:不能获取资金和账户级别的持仓。策略的仓位管理(风险分配)应该由交易员来做,而不是让程序自动做

Q:非交易时间报单,是直接返回拒单,还是返回提交中等开盘了再打出去?
A:在策略层,如on_bar()函数里面第一个逻辑应该是self.cancel_all(),目的是清空为成交的委托(包括本地停止单,限价单),保证当前时间点委托状态是唯一的。
若在非交易时间发单,服务器收到这个委托会推送一个拒单的委托回报。

Q:vn.py如何查询实盘账户的历史资金情况
A:大部分交易系统并未提供历史资金查询功能,一般都是只能查当前时间点的资金,所以需要你自己保存。

 

其他问题

Q: 请问下,vn.py中有期权回测的例子么?vn.py 关于期权,是不是就只有OptionMatser模块?
A:期权的波动率交易策略一般无需回测,更多依赖建模;可以用其他组件交易期权,比如CTA策略模块赌趋势,或者SpreadTrading模块赌价差,但这些本质都不是期权交易策略。

Q:如何根据资金量进行下单,而不是固定手数下单?
A:vn.py框架下不建议交易程序在实盘中去获取账户可用资金,并调整交易手数,这是很危险的事情。

 
 
 

接口登录相关


CTP接口

Q:如何实现行情并发推送?
A:C++ API的回调函数只有一个推送线程;用户可以一次性订阅全市场的合约,某个合约有行情变动的时候才会推送.

Q:CTP能可否获取到指定经纪商的手续费?
A:这个手续费应该是连带期货公司的部分,但是不建议在交易程序中去访问这种数据,相关数据应该每天在启动策略前就获取好配置到策略里。

Q:郑商所的品种都收不到14:59这一根bar
A:郑商所的数据推送,没有3点后的最后一个tick用于标识收盘完成,所以要调用BarGenerator.generate函数来做最终的强制K线生成

Q:国内期货模拟,除了sinnow可以模拟,还有别的账号可以模拟吗,可以去期货公司申请模拟账号模拟吗?
A:SimNow是目前最稳定的仿真环境了,可以找期货公司申请,比如中信、上海中期等。

Q:sinnow连接有时会断开,然后传送的tick数据的时间就会延后,就是收盘时,本来应该平仓平不了,超过三点了还在传tick。
A:SimNow环境因为免费,用的人很多,所以服务器有时会卡。

Q:为什么有时候会不停的断开重连呢?
A:服务器关了或者人满了,或者账号密码错误。

Q:连接进 SimNow后,按”市价“下单被拒绝,按”限价“”FAK“”FOK"下单可以,请问,是否不支持“市价”下单。
A:SimNow不支持市价单,实盘支持。

Q:连接SimNow后,下单提示“提交中“无法成交也无法撤单
A:这种情况,一般是委托请求没有到CTP柜台(网络断了),或者CTP柜台挂了

Q:SimNow上不去,上去了注册又总是提示验证码失败,还有其他模拟的推荐吗?
A:SimNow是目前最推荐的仿真环境,建议换交易时间注册,以及SimNow主要支持移动和联通的手机号

Q:CTP配置无法连接:输入论坛登录名,账号,配置对应的交易服务器和行情服务器,点击连接,无任何反应
A:CTP的测试账号请通过SimNow获取,不是vn.py论坛的

Q:已有行情数据显示。 但是不能发单,如rb1905
A:检查是否漏填交易所或者上委托数量的字段。

Q:下单异常,第一种情况:一点击委托就是直接“已撤销”(委托栏里委托状态),双击撤单的时候,又会显示“交易撤单失败,代码25”。第二种情况:一点击委托就是直接“提交中”(委托栏里委托状态),等到双击撤单的时候,又会显示“交易撤单失败,代码25”
A:第一个情况应该是报单不符合服务端的要求,被拒单撤销了;第二个情况感觉是你的网络断了,报单请求发出但没有到服务器。可以顺着这两个方向检查。

Q:CTPTEST测试交易期货公司采集不到硬盘序列号,CPU序列号,BIOS序列号
A:数据采集是通过API内部的代码自动完成的,其他任何上层程序都无法影响。建议换机器。

Q:登录穿透式仿真账号问题,一直报4097的错
A:不要同时import CTPTEST和CTP这两个接口

Q:使用ctp和ctptest登录都显示不合法登录。错误代码3
A:账户密码错误

Q:CTP能否两个账号同时登录并且同时操作。
A:同一个接口只能登录一次。如果要同时登录两个CTP,需要自己扩展修改CtpGateway,然后加载两个CtpGateway;对于不同的接口,比如股票的XTP和期货的CTP,可以直接同时登录使用,并在策略中同时交易这两个接口的合约

 

IB接口

Q:如何链接盈透api ?
A:启动TWS;在配置中打开Socket连接功能;在vn.py中加载ibGateway,然后启动就行。

Q:IB接口连接,错误提示显示:couldn't connect to TWS. confirm that "enable activex and socket clients" is enabled aports for new installations of version 954.1 or newer: TWS:7497:IB gateway: 4002。
A:请检查TWS是否打开了socket访问功能。

Q:启动行情记录,则程序假死。
A:IB的行情订阅函数没有异步缓存逻辑,用DataRecorder脚本的话,会在连接TWS之前就进行了订阅请求,导致死掉。IbGatewa已经加上了历史数据查询获取功能,直接从IB查询K线数据进行初始化就行,不建议自己录制了。

 

富途接口
Q:futu_gateway里面的登陆信息设置,为何只要密码?
A:需要先下载和安装FUTU OPEN API:https://www.futunn.com/openAPI

 

华宝派
Q:华宝派如何申请试用/实盘呢?
A:请在华宝证券开户,然后联系客户经理申请使用华宝PI

 
 
 

模块应用相关


交易复制
Q:当跟单帐户检测到被跟单帐户的仓位变化后,具体操作是什么?
A:TradeCopy的发布者账号,维护一份本地持仓数据表,当有成交推送时立即更新计算最新仓位;发布者每当收到成交推送时,或者每隔一定的时间间隔(默认1秒),会广播一次当前自己的仓位信息;订阅者收到广播推送的发布者仓位后,乘以自身的复制系数,作为目标仓位;订阅者根据目标仓位,和自身实际持仓的偏差,决定具体的下单操作(目标是将实际持仓同步到和目标持仓一致),如果有之前的委托,会先执行撤单。

 

RQData
Q:import rqdatac 失败,没有找到rqdatac包
A:运行以下命令安装

pip install --extra-index-url https://rquser:ricequant99@py.ricequant.com/simple/ rqdatac==1.0.0a66

Q:如何配置RQDAAT账户?
A:申请试用账号:https://www.ricequant.com/purchase
在VN Trader主界面上,点配置,rqdata.username rqdata.password输入
重启VN Trader就能用了

 

工作线程
Q:vn.py运行的时候,会启动哪些线程?
A:
1.主线程:带PyQt界面时运行Qt的循环,无界面时可以直接阻塞或者用while循环
2.事件引擎线程:处理事件引擎队列中的事件,并调用注册的处理函数处理,所以如果是CtaStrategy层,所有回调函数你可以认为都是单线程在驱动的(每次只有一个在调用)
3.API层线程:不同的API不一样了。

Q:eventEngine2.__queue很多数据没有处理、队列一直变大?
A:如果queue的大小只增加,不减少,只可能是没有启动EventEngine,导致事件没有处理持续挤压导致的。否则运行过程中即使没有注册处理函数,该事件的数据也会被抛弃掉,不会继续保存着。

 

行情记录
Q:行情记录后怎么查看和下载历史行情数据?
A:行情记录模块是将tick或者bar直接存入你配置的数据库的,你要单独查看,可以用数据库可视化工具连接本地数据库查看,如果在vn.py里回测使用,不需要下载,是默认从数据库里找相关数据的

Q:datarecoder 和 cta running 的 CTP能不能分开设置?
A:可以,另外新建一个目录,里面创建.vntrader文件夹,在这个目录用run.py或者VN Station启动VN Trader,CTP接口的登录信息就都是独立的了。

 
其他

Q:网站上下载的vnpy ,和Anaconda site-package里面的vnpy有什么区别?
A:进行运行的是ananconda里面的vnpy。如同在anaconda里面调用numpy一样。

Q:修改vnpy代码后需要更新到anaconda site-package对应的文件里?
A:python里import的vnpy就是site-packages里的,你可以修改下环境变量,把你clone的那个目录加入搜索路径,这样你修改了clone的那个vn.py,用的时候就自动改了

Q:一键安装完2.0.5 后,再另外安装anaconda3, spyder无法使用
A:假设你安装到c:\anaconda3。打开cmd,运行c:\anaconda3\scripts\activate,然后再运行python,就会进入anaconda环境了

Q:请问算法交易怎么用,可以策略生成下单指令,由算法下单吗?
A:目前AlgoTrading模块主要通过GUI和篮子委托文件的方式来实现算法下单
尽管可以通过扩展的方式,实现策略调用算法执行交易,但更建议在CTA策略中自行实现算法交易的逻辑,获得更好的细节控制能力

Q:vn.py中配置界面中email各项设置如何填写,有何用处?
A:设置如下
"email.server": "SMTP邮件服务器地址",
"email.port": SMTP邮件服务器端口号,
"email.username": "邮箱用户名",
"email.password": "邮箱密码",
"email.sender": "发送者邮箱",
"email.receiver": "接收者邮箱",

 
 
 

社区操作相关


Q:找回vn.py社区的密码
A:在这个页面可以找回密码:https://www.vnpy.com/auth/reset-password

Q:注册了社区账号,但是登录报错。[WinError 10061] 由于目标计算机积极拒绝,无法连接
A:这个是代理服务器问题吧,换个网络试试。

Q:维恩的派和vn.py社区有什么关系?
A:维恩的派是vn.py社区2015-2018年的老论坛,但由于discuz的各种问题使用体验太差,现在已经停止使用,将在19年底正式下线。

Q:社区怎么上传照片的?
A:直接把图片拖动到编辑框中就能自动上传了

Q:有微信群,QQ群吗?
A:vn.py框架学习群:666359421 ; vn.py框架交流群:262656087

那么改用“endBalance”,sharpeRatio,maxDrawdown等试一下。因为在逐日统计回测中,capital代表的是起始资金

把fixedSize添加到paraList上啊

 

paramList = ['name',
             'className',
             'author',
             'vtSymbol',
             'k1',
             'k2',
             'fixedSize'] 

作者:张国平 ;来源:维恩的派论坛

 
在微信公众号 “量化投资与机器学习”, 看到一个推文“我就不用AI、ML模型预测股价,来点不一样的“, 链接如下:
我就不用AI、ML模型预测股价,来点不一样的!

交易思维是基于历史数据中,一组数据比如100天中,K线中最高点或者最低点相对于开始价位价差点差,再利用numpy的函数numpy.percentile(), 计算在比如95%机会,最高点或者最低点的点差数字。如果点差是5个点,就可以认为下一根K线也有95%概率有5个点受益。

尝试在VNPY实现。

思路整理:

  • 入场:如果最近N(30)个D分钟k线,通过下面代码计算,分析对于概率prb比如90%,如果存在一个点差大于TickValueLimit一个值TickValue,说明过去N个分钟,有P的概率,bar开始下单,在bar中有最高点或者最低点获得TickValue。那么在下个bar开始时候,买入。
  • 出场,如果到达持有价格POSprice +/- TickValue, 则卖出;重新进行入场分析。如果这个bar中间没到达目标价格,在bar结束时候分析是否还满足入场条件,如果继续满足则持有,否则平仓,如果是反向,则反向开单。
  • 止损,如果在持有时候,下跌到反向POSPrice +/- Multiple * TickValue 价格时候,平仓。Multiple 随着时间增加逐渐减少。

 
 
回测设置

  • RqData日线级别数据:IF99
  • 手续费:万0.3
  • 合约规模:300
  • 滑点:0.2
  • 最小价格变化:0.2
  • 本金:100万

 
 

回测效果
enter image description here

 
 
代码如下:

# encoding: UTF-8
from __future__ import division

from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT, OFFSET_OPEN,OFFSET_CLOSE
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
                                                     BarGenerator,
                                                     ArrayManager)
import numpy as np
from datetime import datetime, time


########################################################################
class PercentileStrategy(CtaTemplate):
    """MACD策略Demo"""
    className = 'PercentileStrategy'
    author = u'BillyZhang'
    fixedSize = 1
    # 策略参数
    calWindow = 15
    percentile = 95
    tickValueLimit = 5
    Multiple = 0.8


    # 策略变量
    p = 0
    tickValue = 0
    tradeSign = 0
    tickValueHigh = 0
    tickValueLow = 0


    longStop = 0  # 多头止损
    shortStop = 0  # 空头止损
    margin = 0
    lowerLimit = 0
    upperLimit = 50000

    # 时间
    initDays = 0
    DAY_START = time(9, 10)  # 日盘启动和停止时间
    DAY_END = time(14, 55)
    NIGHT_START = time(21, 10)  # 夜盘启动和停止时间
    NIGHT_END = time(10, 55)

    # 参数列表,保存了参数的名称
    paramList = ['name',
                 'className',
                 'author',
                 'vtSymbol',
                 'initDays',
                 'fixedSize',
                 'calWindow',
                 'percentile',
                 'tickValueLimit',
                 'Multiple'
                 ]

    # 变量列表,保存了变量的名称
    varList = ['inited',
               'trading',
               'pos',
               'longStop',
               'shortStop',
               'posPrice',
               'lowerLimit',
               'p',
               'tickValue',
               'tradeSign',
               'tickValueHigh',
               'tickValueLow'
                ]

    # 同步列表,保存了需要保存到数据库的变量名称
    syncList = ['pos',
                'posPrice',
                'longStop',
                'shortStop'
                ]

    # ----------------------------------------------------------------------
    def __init__(self, ctaEngine, setting):
        """Constructor"""
        super(PercentileStrategy, self).__init__(ctaEngine, setting)
        self.am = ArrayManager(size = self.calWindow)


        # 注意策略类中的可变对象属性(通常是list和dict等),在策略初始化时需要重新创建,
        # 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险,
        # 策略类中的这些可变对象属性可以选择不写,全都放在__init__下面,写主要是为了阅读
        # 策略时方便(更多是个编程习惯的选择)

    # ----------------------------------------------------------------------
    def onInit(self):
        """初始化策略(必须由用户继承实现)"""
        self.writeCtaLog(u'%s策略初始化' % self.name)

        initData = self.loadBar(self.initDays)
        for bar in initData:
            self.onBar(bar)
        self.putEvent()

    # ----------------------------------------------------------------------
    def onStart(self):
        """启动策略(必须由用户继承实现)"""
        if self.pos == 0:
            self.writeCtaLog(u'%s策略启动' % self.name)

        # 当前无仓位,发送开仓委托
        # 持有多头仓位

        self.putEvent()

    # ----------------------------------------------------------------------
    def onStop(self):
        """停止策略(必须由用户继承实现)"""
        self.writeCtaLog(u'%s策略停止' % self.name)
        self.putEvent()

    # ----------------------------------------------------------------------
    def onTick(self, tick):
        """收到行情TICK推送(必须由用户继承实现)"""
        if self.lowerLimit == 0 or self.upperLimit == 0:
            self.lowerLimit = tick.lowerLimit
            self.upperLimit = tick.upperLimit
        self.bg.updateTick(tick)

    # ----------------------------------------------------------------------
    def onBar(self, bar):
        """收到Bar推送(必须由用户继承实现)"""
        #如果是当然最后5分钟,略过

        am = self.am
        am.updateBar(bar)
        if not am.inited:
            return
        # currentTime = datetime.now().time()
        currentTime = time(9,20)
        #计算p,和tickValue
        MaxHigh = am.high / am.open
        MaxLow = am.low / am.open
        MaxClose = am.close / am.open
        lpHigh = np.percentile(MaxHigh, 100 - self.percentile)
        lpLow = np.percentile(MaxLow,  self.percentile)

        self.tickValueHigh = abs(bar.open - bar.open*lpHigh)
        self.tickValueLow = abs(bar.open - bar.open * lpLow)

        if self.tickValueHigh > self.tickValueLow and self.tickValueHigh > self.tickValueLimit:
            self.tradeSign = 1
        elif self.tickValueHigh < self.tickValueLow and self.tickValueLow > self.tickValueLimit:
            self.tradeSign = -1
        else:
            self.tradeSign = 0

        # 平当日仓位, 如果当前时间是结束前日盘15点28分钟,或者夜盘10点58分钟,如果有持仓,平仓。
        if ((currentTime >= self.DAY_START and currentTime <= self.DAY_END) or
            (currentTime >= self.NIGHT_START and currentTime <= self.NIGHT_END)):
            if self.pos == 0:
                if self.tradeSign == 0:
                    pass
                elif self.tradeSign == 1 and bar.close > self.lowerLimit:
                    self.buy(bar.close + 5,self.fixedSize,False)
                elif self.tradeSign == -1 and bar.close < self.upperLimit:
                    self.short(bar.close - 5,self.fixedSize,False)
            elif self.pos > 0:
                if self.tradeSign == 1 or self.tradeSign == 0:
                    pass
                elif self.tradeSign == -1:
                    self.sell(bar.close-5, abs(self.pos), False)
            elif self.pos < 0:
                if self.tradeSign == -1 or self.tradeSign == 0:
                    pass
                elif self.tradeSign ==1:
                    self.cover(bar.close+5, abs(self.pos), False)

        else:
            if self.pos > 0:
                self.sell(bar.close-5, abs(self.pos), False)
            elif self.pos < 0:
                self.cover(bar.close+5, abs(self.pos), False)
            elif self.pos == 0:
                return

    # ----------------------------------------------------------------------
    def onOrder(self, order):
        """收到委托变化推送(必须由用户继承实现)"""
        # 对于无需做细粒度委托控制的策略,可以忽略onOrder
        pass

    # ----------------------------------------------------------------------
    def onTrade(self, trade):
        # 发出状态更新事件
        """收到成交推送(必须由用户继承实现)"""
        # 对于无需做细粒度委托控制的策略,可以忽略onOrder
        if trade.offset == OFFSET_OPEN:
            self.posPrice = trade.price
            if self.tradeSign == 1:
                self.sell(self.posPrice + self.tickValueHigh,abs(self.pos),False)
                self.sell(self.posPrice - self.Multiple*self.tickValueHigh, abs(self.pos), True)
            elif self.tradeSign == -1:
                self.cover(self.posPrice - self.tickValueLow, abs(self.pos), False)
                self.cover(self.posPrice + self.Multiple*self.tickValueLow, abs(self.pos),True)
        elif trade.offset == OFFSET_CLOSE:
            self.cancelAll()
            self.tradeSign = 0
            # 同步数据到数据库
        self.saveSyncData()

    # ----------------------------------------------------------------------
    def onStopOrder(self, so):
        """停止单推送"""
        pass

作者:时间 ;来源:维恩的派论坛 ;版本:v1.72
 

改写原因:行情记录多个合约的数据时,在下午收盘后依旧有很多tick数据未来的及插入数据库,尤其是在1.7.1版本时(mainEngine.dbUpdate()的模式下)

 

改写方式:

  1. 主要是在陈老师的drengine上进行更改
  2. 每个队列负责记录一个合约,每个线程负责一个合约的插入
  3. 主力合约数据没有单独推送到队列,而是跟随tick 或bar数据直接插入数据库
  4. 采用1.7.2版本的(mainEngine.dbInsert()模式)

 
效果: 七个合约进行数据记录,基本实现生产和消费同步,测试时间不久,错误之处欢迎指正

 
 
代码如下:

# encoding: UTF-8

'''
本文件中实现了行情数据记录引擎,用于汇总TICK数据,并生成K线插入数据库。

使用DR_setting.json来配置需要收集的合约,以及主力合约代码。
'''

import json
import csv
import os
import copy
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue, Empty
from threading import Thread

from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.vtObject import VtSubscribeReq, VtLogData, VtBarData, VtTickData
from vnpy.trader.app.ctaStrategy.ctaTemplate import BarManager

from .drBase import *
from .language import text


########################################################################
# class DrEngine(object):
#     """数据记录引擎"""

#     settingFileName = 'DR_setting.json'
#     settingFilePath = getJsonPath(settingFileName, __file__)
#     # print u"数据记录配置文件地址:",settingFilePath  

#     #----------------------------------------------------------------------
#     def __init__(self, mainEngine, eventEngine):
#         """Constructor"""
#         self.mainEngine = mainEngine
#         self.eventEngine = eventEngine

#         # 当前日期
#         self.today = todayDate()

#         # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
#         self.activeSymbolDict = {}

#         # Tick对象字典
#         self.tickSymbolSet = set()

#         # K线合成器字典
#         self.bmDict = {}

#         # 配置字典
#         self.settingDict = OrderedDict()

#         # 负责执行数据库插入的单独线程相关
#         self.active = False                     # 工作状态
#         self.queue = Queue()                    # 队列
#         self.thread = Thread(target=self.run)   # 线程

#         # 载入设置,订阅行情
#         self.loadSetting()

#         # 启动数据插入线程
#         self.start()

#         # 注册事件监听
#         self.registerEvent()  

#     #----------------------------------------------------------------------
#     def loadSetting(self):
#         """加载配置"""
#         with open(self.settingFilePath) as f:
#             drSetting = json.load(f)

#             # 如果working设为False则不启动行情记录功能
#             working = drSetting['working']
#             if not working:
#                 return

#             # Tick记录配置
#             if 'tick' in drSetting:
#                 l = drSetting['tick']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol

#                     req = VtSubscribeReq()
#                     req.symbol = setting[0]

#                     # 针对LTS和IB接口,订阅行情需要交易所代码
#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     # 针对IB接口,订阅行情需要货币和产品类型
#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]

#                     self.mainEngine.subscribe(req, gateway)

#                     #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
#                     #self.tickDict[vtSymbol] = tick
#                     self.tickSymbolSet.add(vtSymbol)

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'tick': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['tick'] = True

#             # 分钟线记录配置
#             if 'bar' in drSetting:
#                 l = drSetting['bar']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol

#                     req = VtSubscribeReq()
#                     req.symbol = symbol                    

#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]                    

#                     self.mainEngine.subscribe(req, gateway)  

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'bar': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['bar'] = True     

#                     # 创建BarManager对象
#                     self.bmDict[vtSymbol] = BarManager(self.onBar)

#             # 主力合约记录配置
#             if 'active' in drSetting:
#                 d = drSetting['active']
#                 self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

#     #----------------------------------------------------------------------
#     def getSetting(self):
#         """获取配置"""
#         return self.settingDict, self.activeSymbolDict

#     #----------------------------------------------------------------------
#     def procecssTickEvent(self, event):
#         """处理行情事件"""
#         tick = event.dict_['data']
#         vtSymbol = tick.vtSymbol

#         # 生成datetime对象
#         if not tick.datetime:
#             tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

#         self.onTick(tick)

#         bm = self.bmDict.get(vtSymbol, None)
#         if bm:
#             bm.updateTick(tick)

#     #----------------------------------------------------------------------
#     def onTick(self, tick):
#         """Tick更新"""
#         vtSymbol = tick.vtSymbol

#         if vtSymbol in self.tickSymbolSet:
#             self.insertData(TICK_DB_NAME, vtSymbol, tick)

#             if vtSymbol in self.activeSymbolDict:
#                 activeSymbol = self.activeSymbolDict[vtSymbol]
#                 self.insertData(TICK_DB_NAME, activeSymbol, tick)


#             self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
#                                                              time=tick.time, 
#                                                              last=tick.lastPrice, 
#                                                              bid=tick.bidPrice1, 
#                                                              ask=tick.askPrice1))

#     #----------------------------------------------------------------------
#     def onBar(self, bar):
#         """分钟线更新"""
#         vtSymbol = bar.vtSymbol

#         self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

#         if vtSymbol in self.activeSymbolDict:
#             activeSymbol = self.activeSymbolDict[vtSymbol]
#             self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

#         self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
#                                                         time=bar.time, 
#                                                         open=bar.open, 
#                                                         high=bar.high, 
#                                                         low=bar.low, 
#                                                         close=bar.close))        

#     #----------------------------------------------------------------------
#     def registerEvent(self):
#         """注册事件监听"""
#         self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

#     #----------------------------------------------------------------------
#     def insertData(self, dbName, collectionName, data):
#         """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
#         self.queue.put((dbName, collectionName, data.__dict__))

#     #----------------------------------------------------------------------
#     def run(self):
#         """运行插入线程"""
#         while self.active:
#             try:
#                 dbName, collectionName, d = self.queue.get(block=True, timeout=1)
#                 flt = {'datetime': d['datetime']}
#                 self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
#             except Empty:
#                 pass

#     #----------------------------------------------------------------------
#     def start(self):
#         """启动"""
#         self.active = True
#         self.thread.start()

#     #----------------------------------------------------------------------
#     def stop(self):
#         """退出"""
#         if self.active:
#             self.active = False
#             self.thread.join()

#     #----------------------------------------------------------------------
#     def writeDrLog(self, content):
#         """快速发出日志事件"""
#         log = VtLogData()
#         log.logContent = content
#         event = Event(type_=EVENT_DATARECORDER_LOG)
#         event.dict_['data'] = log
#         self.eventEngine.put(event)   
#     




########################################################################
# class DrEngine(object):
#     """数据记录引擎"""

#     settingFileName = 'DR_setting.json'
#     settingFilePath = getJsonPath(settingFileName, __file__)
#     print u"----------多线程行情数据记录-------------------"  

#     #----------------------------------------------------------------------
#     def __init__(self, mainEngine, eventEngine):
#         """Constructor"""
#         self.mainEngine = mainEngine
#         self.eventEngine = eventEngine

#         # 当前日期
#         self.today = todayDate()

#         # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
#         self.activeSymbolDict = {}

#         # Tick对象字典
#         self.tickSymbolSet = set()

#         # K线合成器字典
#         self.bmDict = {}

#         # 配置字典
#         self.settingDict = OrderedDict()

#         # 负责执行数据库插入的单独线程相关
#         self.active = False                     # 工作状态
#         # self.queue = Queue()                    # 队列
#         # self.thread = Thread(target=self.run)   # 线程
#         self.queue_dict=OrderedDict()
#         self.threadlist=[]

#         # 载入设置,订阅行情
#         self.loadSetting()

#         #加载多线程
#         self.get_threadlist()
#         # 启动数据插入线程
#         self.start()

#         # 注册事件监听
#         self.registerEvent()  

#     #----------------------------------------------------------------------
#     def loadSetting(self):
#         """加载配置"""
#         with open(self.settingFilePath) as f:
#             drSetting = json.load(f)

#             # 如果working设为False则不启动行情记录功能
#             working = drSetting['working']
#             if not working:
#                 return

#             # Tick记录配置
#             if 'tick' in drSetting:
#                 l = drSetting['tick']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol
#                     if symbol not in self.queue_dict:
#                         self.queue_dict[symbol]=Queue()

#                     req = VtSubscribeReq()
#                     req.symbol = setting[0]

#                     # 针对LTS和IB接口,订阅行情需要交易所代码
#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     # 针对IB接口,订阅行情需要货币和产品类型
#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]

#                     self.mainEngine.subscribe(req, gateway)

#                     #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
#                     #self.tickDict[vtSymbol] = tick
#                     self.tickSymbolSet.add(vtSymbol)

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'tick': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['tick'] = True

#             # 分钟线记录配置
#             if 'bar' in drSetting:
#                 l = drSetting['bar']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol
#                     if symbol not in self.queue_dict:
#                         self.queue_dict[symbol]=Queue()

#                     req = VtSubscribeReq()
#                     req.symbol = symbol                    

#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]                    

#                     self.mainEngine.subscribe(req, gateway)  

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'bar': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['bar'] = True     

#                     # 创建BarManager对象
#                     self.bmDict[vtSymbol] = BarManager(self.onBar)

#             # 主力合约记录配置
#             if 'active' in drSetting:
#                 d = drSetting['active']
#                 self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

#     #----------------------------------------------------------------------
#     def getSetting(self):
#         """获取配置"""
#         return self.settingDict, self.activeSymbolDict

#     #----------------------------------------------------------------------
#     def procecssTickEvent(self, event):
#         """处理行情事件"""
#         tick = event.dict_['data']
#         vtSymbol = tick.vtSymbol

#         # 生成datetime对象
#         if not tick.datetime:
#             tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

#         self.onTick(tick)

#         bm = self.bmDict.get(vtSymbol, None)
#         if bm:
#             bm.updateTick(tick)

#     #----------------------------------------------------------------------
#     def onTick(self, tick):
#         """Tick更新"""
#         vtSymbol = tick.vtSymbol

#         if vtSymbol in self.tickSymbolSet:
#             self.insertData(TICK_DB_NAME, vtSymbol, tick)

#             if vtSymbol in self.activeSymbolDict:
#                 activeSymbol = self.activeSymbolDict[vtSymbol]
#                 self.insertData(TICK_DB_NAME, activeSymbol, tick)


#             self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
#                                                              time=tick.time, 
#                                                              last=tick.lastPrice, 
#                                                              bid=tick.bidPrice1, 
#                                                              ask=tick.askPrice1))

#     #----------------------------------------------------------------------
#     def onBar(self, bar):
#         """分钟线更新"""
#         vtSymbol = bar.vtSymbol

#         self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

#         if vtSymbol in self.activeSymbolDict:
#             activeSymbol = self.activeSymbolDict[vtSymbol]
#             self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

#         self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
#                                                         time=bar.time, 
#                                                         open=bar.open, 
#                                                         high=bar.high, 
#                                                         low=bar.low, 
#                                                         close=bar.close))        

#     #----------------------------------------------------------------------
#     def registerEvent(self):
#         """注册事件监听"""
#         self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

#     #----------------------------------------------------------------------
#     def insertData(self, dbName, collectionName, data):
#         """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
#         symbol=data.vtSymbol
#         symbol_queue=self.queue_dict.get(symbol,None)
#         if symbol_queue:
#             symbol_queue.put((dbName, collectionName, data.__dict__))

#     #----------------------------------------------------------------------
#     def run(self,symbol):
#         """运行插入线程"""
#         while self.active:
#             try:
#                 single_queue=self.queue_dict.get(symbol,None)
#                 if single_queue:
#                     print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
#                     dbName, collectionName, d = single_queue.get(block=True, timeout=1)
#                     flt = {'datetime': d['datetime']}
#                     self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
#             except Empty:
#                 pass

#     #----------------------------------------------------------------------
#     def get_threadlist(self):
#         symbollist=self.queue_dict.keys()
#         for symbol in symbollist:
#             t=Thread(target=self.run,args=(symbol,))
#             self.threadlist.append(t)

#     def start(self):
#         """启动"""
#         self.active = True
#         # self.thread.start()
#         for t in self.threadlist:
#             t.start()

#     #----------------------------------------------------------------------
#     def stop(self):
#         """退出"""
#         if self.active:
#             self.active = False
#             self.thread.join()

#     #----------------------------------------------------------------------
#     def writeDrLog(self, content):
#         """快速发出日志事件"""
#         log = VtLogData()
#         log.logContent = content
#         event = Event(type_=EVENT_DATARECORDER_LOG)
#         event.dict_['data'] = log
#         self.eventEngine.put(event)   




########################################################################
class DrEngine(object):
    """数据记录引擎"""

    settingFileName = 'DR_setting.json'
    settingFilePath = getJsonPath(settingFileName, __file__)  

    #----------------------------------------------------------------------
    def __init__(self, mainEngine, eventEngine):
        """Constructor"""
        self.mainEngine = mainEngine
        self.eventEngine = eventEngine

        # 当前日期
        self.today = todayDate()

        # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
        self.activeSymbolDict = {}

        # Tick对象字典
        self.tickSymbolSet = set()

        # K线合成器字典
        self.bmDict = {}

        # 配置字典
        self.settingDict = OrderedDict()

        # 负责执行数据库插入的单独线程相关
        self.active = False                     # 工作状态
        # self.queue = Queue()                    # 队列
        # self.thread = Thread(target=self.run)   # 线程
        self.queue_dict=OrderedDict()
        self.threadlist=[]

        # 载入设置,订阅行情
        self.loadSetting()

        #加载多线程
        self.get_threadlist()
        # 启动数据插入线程
        self.start()

        # 注册事件监听
        self.registerEvent()  

    #----------------------------------------------------------------------
    def loadSetting(self):
        """加载配置"""
        with open(self.settingFilePath) as f:
            drSetting = json.load(f)

            # 如果working设为False则不启动行情记录功能
            working = drSetting['working']
            if not working:
                return

            # Tick记录配置
            if 'tick' in drSetting:
                l = drSetting['tick']

                for setting in l:
                    symbol = setting[0]
                    gateway = setting[1]
                    vtSymbol = symbol
                    if symbol not in self.queue_dict:
                        self.queue_dict[symbol]=Queue()

                    req = VtSubscribeReq()
                    req.symbol = setting[0]

                    # 针对LTS和IB接口,订阅行情需要交易所代码
                    if len(setting)>=3:
                        req.exchange = setting[2]
                        vtSymbol = '.'.join([symbol, req.exchange])

                    # 针对IB接口,订阅行情需要货币和产品类型
                    if len(setting)>=5:
                        req.currency = setting[3]
                        req.productClass = setting[4]

                    self.mainEngine.subscribe(req, gateway)

                    #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
                    #self.tickDict[vtSymbol] = tick
                    self.tickSymbolSet.add(vtSymbol)

                    # 保存到配置字典中
                    if vtSymbol not in self.settingDict:
                        d = {
                            'symbol': symbol,
                            'gateway': gateway,
                            'tick': True
                        }
                        self.settingDict[vtSymbol] = d
                    else:
                        d = self.settingDict[vtSymbol]
                        d['tick'] = True

            # 分钟线记录配置
            if 'bar' in drSetting:
                l = drSetting['bar']

                for setting in l:
                    symbol = setting[0]
                    gateway = setting[1]
                    vtSymbol = symbol
                    if symbol not in self.queue_dict:
                        self.queue_dict[symbol]=Queue()

                    req = VtSubscribeReq()
                    req.symbol = symbol                    

                    if len(setting)>=3:
                        req.exchange = setting[2]
                        vtSymbol = '.'.join([symbol, req.exchange])

                    if len(setting)>=5:
                        req.currency = setting[3]
                        req.productClass = setting[4]                    

                    self.mainEngine.subscribe(req, gateway)  

                    # 保存到配置字典中
                    if vtSymbol not in self.settingDict:
                        d = {
                            'symbol': symbol,
                            'gateway': gateway,
                            'bar': True
                        }
                        self.settingDict[vtSymbol] = d
                    else:
                        d = self.settingDict[vtSymbol]
                        d['bar'] = True     

                    # 创建BarManager对象
                    self.bmDict[vtSymbol] = BarManager(self.onBar)

            # 主力合约记录配置
            if 'active' in drSetting:
                d = drSetting['active']
                self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

    #----------------------------------------------------------------------
    def getSetting(self):
        """获取配置"""
        return self.settingDict, self.activeSymbolDict

    #----------------------------------------------------------------------
    def procecssTickEvent(self, event):
        """处理行情事件"""
        tick = event.dict_['data']
        vtSymbol = tick.vtSymbol

        # 生成datetime对象
        if not tick.datetime:
            tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

        self.onTick(tick)

        bm = self.bmDict.get(vtSymbol, None)
        if bm:
            bm.updateTick(tick)

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """Tick更新"""
        vtSymbol = tick.vtSymbol

        if vtSymbol in self.tickSymbolSet:
            self.insertData(TICK_DB_NAME, vtSymbol, tick)

            # if vtSymbol in self.activeSymbolDict:
            #     activeSymbol = self.activeSymbolDict[vtSymbol]
            #     self.insertData(TICK_DB_NAME, activeSymbol, tick)


            self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
                                                             time=tick.time, 
                                                             last=tick.lastPrice, 
                                                             bid=tick.bidPrice1, 
                                                             ask=tick.askPrice1))

    #----------------------------------------------------------------------
    def onBar(self, bar):
        """分钟线更新"""
        vtSymbol = bar.vtSymbol

        self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

        # if vtSymbol in self.activeSymbolDict:
        #     activeSymbol = self.activeSymbolDict[vtSymbol]
        #     self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

        self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
                                                        time=bar.time, 
                                                        open=bar.open, 
                                                        high=bar.high, 
                                                        low=bar.low, 
                                                        close=bar.close))        

    #----------------------------------------------------------------------
    def registerEvent(self):
        """注册事件监听"""
        self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

    #----------------------------------------------------------------------
    def insertData(self, dbName, collectionName, data):
        """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
        symbol=data.vtSymbol
        symbol_queue=self.queue_dict.get(symbol,None)
        if symbol_queue:
            symbol_queue.put((dbName, collectionName, data.__dict__))

    #----------------------------------------------------------------------
    def run(self,symbol):
        """运行插入线程"""
        while self.active:
            try:
                single_queue=self.queue_dict.get(symbol,None)
                if single_queue:
                    # sys.stdout.write(u"----{}----,队列大小为:{} \r".format(symbol,single_queue.qsize()))
                    # print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
                    dbName, collectionName, d =single_queue.get(block=True, timeout=1)
                    flt = {'datetime': d['datetime']}
                    # self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
                    self.mainEngine.dbInsert(dbName, collectionName, d)
                    #同样的数据插入主力连续合约数据集
                    if symbol in self.activeSymbolDict:
                        activeSymbol = self.activeSymbolDict[symbol]
                        # self.mainEngine.dbUpdate(dbName, activeSymbol, d, flt, True)
                        self.mainEngine.dbInsert(dbName, activeSymbol, d)

            except Empty:
                pass

    #----------------------------------------------------------------------
    def get_threadlist(self):
        symbollist=self.queue_dict.keys()
        for symbol in symbollist:
            t=Thread(target=self.run,args=(symbol,))
            self.threadlist.append(t)

    def get_total_tasknumber(self):
        total_size=0
        symbollist=self.queue_dict.keys()
        for symbol in symbollist:
            total_size=total_size+self.queue_dict[symbol].qsize()
        return total_size

    def start(self):
        """启动"""
        self.active = True
        # self.thread.start()
        for t in self.threadlist:
            t.start()

    #----------------------------------------------------------------------
    def stop(self):
        """退出"""
        if self.active:
            self.active = False
            self.thread.join()

    #----------------------------------------------------------------------
    def writeDrLog(self, content):
        """快速发出日志事件"""
        log = VtLogData()
        log.logContent = content
        event = Event(type_=EVENT_DATARECORDER_LOG)
        event.dict_['data'] = log
        self.eventEngine.put(event)

作者:张国平 ;来源:维恩的派论坛

 

1.主要分析两个在类ctaTemplate的中的函数,onTrade和onOrder,其实两个很相似,被别的其他实例调用,推入更新的Trade和Order实例,并执行函数内的代码。对于Tick级别的交易,还是还是会经常用到这两个。下面是在ctaTemplate中的定义。

            def onOrder(self, order):
                """收到委托变化推送(必须由用户继承实现)"""
                # 对于无需做细粒度委托控制的策略,可以忽略onOrder
                pass

            # ----------------------------------------------------------------------
            def onTrade(self, trade):
                """收到成交推送(必须由用户继承实现)"""
                # 对于无需做细粒度委托控制的策略,可以忽略onOrder
                pass

 
 

2.先去看看order和trade是什么样的类,两个都在vtObject.py里面。理论上来说,在tick级别中高频策略,当order和trade发生变化后,使用onOrder/onTrade 传递更新给策略;函数onOrder/onTrade里面一般定义一些对应不同状态进行的对应操作。

1) VtTradeData包含是成交的数据,其中最关键就是vtOrderID,可以和之前发送交易返回的vtOrderID做对应,用来对应的交易订单。其他诸如direction/offset/price/volume都是很重要;可以用来更新postion数据。

2) 类VtOrderData和之前VtQrderReq很像,但是不一样,这个是记录委托信息状态,req是交易请求,其中最关键的就是status,订单状态;这里有四个状态(ALLTRADED全部成交,PARTTRADED部分成交, NOTTRADED未成交,和CANCLLED拒单),这些属性在ctpGateway.py定义的。

        class VtTradeData(VtBaseData):
            """成交数据类"""

            #----------------------------------------------------------------------
            def __init__(self):
                """Constructor"""
                super(VtTradeData, self).__init__()
                        # 代码编号相关
                self.symbol = EMPTY_STRING              # 合约代码
                self.exchange = EMPTY_STRING            # 交易所代码
                self.vtSymbol = EMPTY_STRING            # 合约在vt系统中的唯一代码,通常是 合约代码.交易所代码
                self.tradeID = EMPTY_STRING             # 成交编号
                self.vtTradeID = EMPTY_STRING           # 成交在vt系统中的唯一编号,通常是 Gateway名.成交编号

                self.orderID = EMPTY_STRING             # 订单编号
                self.vtOrderID = EMPTY_STRING           # 订单在vt系统中的唯一编号,通常是 Gateway名.订单编号
                   # 成交相关
                self.direction = EMPTY_UNICODE          # 成交方向
                self.offset = EMPTY_UNICODE             # 成交开平仓
                self.price = EMPTY_FLOAT                # 成交价格
                self.volume = EMPTY_INT                 # 成交数量
                self.tradeTime = EMPTY_STRING           # 成交时间


        ########################################################################
        class VtOrderData(VtBaseData):
            """订单数据类"""

            #----------------------------------------------------------------------
            def __init__(self):
                """Constructor"""
                super(VtOrderData, self).__init__()

                # 代码编号相关
                self.symbol = EMPTY_STRING              # 合约代码
                self.exchange = EMPTY_STRING            # 交易所代码
                self.vtSymbol = EMPTY_STRING            # 合约在vt系统中的唯一代码,通常是 合约代码.交易所代码
                self.orderID = EMPTY_STRING             # 订单编号
                self.vtOrderID = EMPTY_STRING           # 订单在vt系统中的唯一编号,通常是 Gateway名.订单编号

                # 报单相关
                self.direction = EMPTY_UNICODE          # 报单方向
                self.offset = EMPTY_UNICODE             # 报单开平仓
                self.price = EMPTY_FLOAT                # 报单价格
                self.totalVolume = EMPTY_INT            # 报单总数量
                self.tradedVolume = EMPTY_INT           # 报单成交数量
                self.status = EMPTY_UNICODE             # 报单状态    
                self.orderTime = EMPTY_STRING           # 发单时间
                self.cancelTime = EMPTY_STRING          # 撤单时间

                # CTP/LTS相关
                self.frontID = EMPTY_INT                # 前置机编号
                self.sessionID = EMPTY_INT              # 连接编号

 
 

3.之前提到数次通过onOrder/onTrade传递最新Order/Trade状态,这个负责处理的是一个系列过程,上层推手就是类ctaEngine,下面主要说下函数processOrderEvent,处理委托推送。其中传入的event是一个事件对象,由一个type_说明类型,和一个字典dict_存储具体的事件数据组成。可以理解为是上面vtObject的一个包装盒,eventEngine只要根据标签type_,就可以把具体数据传给对应的下层处理者。这个关于event具体的后面再分析。

这个函数,首先读取了event字典中包好的order,因为存在手动发起交易情况, 如果这个vtOrder是之前通过策略发出的,则调用callStrategyFunc来把这个order回传到对应strategy.onOrder方法,如果是手动发出指令就算了。同时也分析状态,如果在委托完成状态,也更新strategyOrderDict字典,移除这个。

        def processOrderEvent(self, event):
                """处理委托推送"""
                order = event.dict_['data']
                vtOrderID = order.vtOrderID

                if vtOrderID in self.orderStrategyDict:
                    strategy = self.orderStrategyDict[vtOrderID]            

                    # 如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除
                    if order.status in self.STATUS_FINISHED:
                        s = self.strategyOrderDict[strategy.name]
                        if vtOrderID in s:
                            s.remove(vtOrderID)

                    self.callStrategyFunc(strategy, strategy.onOrder, order)

 
 

4.在往上追溯就到eventEngine,首先在ctaEngine初始化时候,会分配eventEngine实例,再通过下面代码注册处理事件,当某类事件收到时候,调用对应的方法,比如事件类型EVENT_ORDER, 对应的方法是self.processOrderEvent。

           class ctaEngine
            def registerEvent(self):
                """注册事件监听"""
                self.eventEngine.register(EVENT_TICK, self.processTickEvent)
                self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
                self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)

        class eventEngine
            def register(self, type_, handler):
                """注册事件处理函数监听"""
                # 尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的list
                handlerList = self.__handlers[type_]

                # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
                if handler not in handlerList:
                    handlerList.append(handler)

在eventEngine中的register函数就是处理的方法通过__handlers字典来对应,__handlers是defaultdict(list),是一种特殊的字典,最大特点就是如果同一个key值插入不同value,他不会像就普通dict用新的替代,而且变成{key:[value1,value2,……]}这样存储。这样就可以让同一个type,可以有对应多个接收handler。

这里有两个eventEngine, 按照官方说法:

  • EventEngine类使用了PyQt中的QTimer来实现定时器功能,由PyQt应用主线程中的Qt事件循环来定时触发(无需新开单独的线程),适合在带有图形界面的应用程序中使用(如examples/VnTrader);
  • EventEngine2类则是使用了一个单独的线程来实现定时器功能,适合在无图形界面的应用程序中使用(如examples/CtaTrading)。

 
 

5.上面说了eventEngine的组成Event,然后还有一个后面处理函数def __process(self, event)。 在一个内部队列__queue中不停抓起event,通过检索字典__handlers来分配到对应的函数处理。那么谁放入新的event呢,就是一个调用put(event)函数向事件队列插入事件。这个时候发现一个特殊的EVENT_TIMER ,看了半天,感觉可以理解为是一个节奏控制器,每一秒去做一次process;那么对于高频来说,可能换成500毫秒更合适。

下面是VNPY定义的EVENT事件。

        # 系统相关
        EVENT_TIMER = 'eTimer'                  # 计时器事件,每隔1秒发送一次
        EVENT_LOG = 'eLog'                      # 日志事件,全局通用

        # Gateway相关
        EVENT_TICK = 'eTick.'                   # TICK行情事件,可后接具体的vtSymbol
        EVENT_TRADE = 'eTrade.'                 # 成交回报事件
        EVENT_ORDER = 'eOrder.'                 # 报单回报事件
        EVENT_POSITION = 'ePosition.'           # 持仓回报事件
        EVENT_ACCOUNT = 'eAccount.'             # 账户回报事件
        EVENT_CONTRACT = 'eContract.'           # 合约基础信息回报事件
        EVENT_ERROR = 'eError.'                 # 错误回报事件

 
 

6.现在想着是谁在不停的给这个内部队列放入order/trick状态的event呢, 而在ctpGate这个类中,在其父类vtGate中有onOrder方法,很规范的打包order到evet,然后放到队列里面。还有分析后发现在Mainengine对整个eventEngine进行管理,并通过addGateway通过中把在事件引擎和交易接口管理。

            def onOrder(self, order):
                """订单变化推送"""
                # 通用事件
                event1 = Event(type_=EVENT_ORDER)
                event1.dict_['data'] = order
                self.eventEngine.put(event1)

                # 特定订单编号的事件
                event2 = Event(type_=EVENT_ORDER+order.vtOrderID)
                event2.dict_['data'] = order
                self.eventEngine.put(event2)

 
 

7.在至上是class CtpTdApi(TdApi)这个类的,读取data中的order相关数据,创建order,推送到上面的这个onOrder里面; 在往上就有点头大了,这个data信息应该是从编译底层返回的。

         def onRtnOrder(self, data):
                """报单回报"""
                # 更新最大报单编号
                newref = data['OrderRef']
                self.orderRef = max(self.orderRef, int(newref))

                # 创建报单数据对象
                order = VtOrderData()
                order.gatewayName = self.gatewayName

                # 保存代码和报单号
                order.symbol = data['InstrumentID']
                order.exchange = exchangeMapReverse[data['ExchangeID']]
                order.vtSymbol = order.symbol #'.'.join([order.symbol, order.exchange])

                order.orderID = data['OrderRef']
                # CTP的报单号一致性维护需要基于frontID, sessionID, orderID三个字段
                # 但在本接口设计中,已经考虑了CTP的OrderRef的自增性,避免重复
                # 唯一可能出现OrderRef重复的情况是多处登录并在非常接近的时间内(几乎同时发单)
                # 考虑到VtTrader的应用场景,认为以上情况不会构成问题
                order.vtOrderID = '.'.join([self.gatewayName, order.orderID])        

                order.direction = directionMapReverse.get(data['Direction'], DIRECTION_UNKNOWN)
                order.offset = offsetMapReverse.get(data['CombOffsetFlag'], OFFSET_UNKNOWN)
                order.status = statusMapReverse.get(data['OrderStatus'], STATUS_UNKNOWN)            

                # 价格、报单量等数值
                order.price = data['LimitPrice']
                order.totalVolume = data['VolumeTotalOriginal']
                order.tradedVolume = data['VolumeTraded']
                order.orderTime = data['InsertTime']
                order.cancelTime = data['CancelTime']
                order.frontID = data['FrontID']
                order.sessionID = data['SessionID']

                # 推送
                self.gateway.onOrder(order)

 

总体来看,eventEngine这个是一个总的驱动,在内部queue这个传送带,分发做了字典里面类型标记的Event实例给对应的处理对象;ctpGateway这个通过put把新的event放入queue中。

vn.py实现了2个维度上的单位头寸限制,分别是单品种头寸上限是4,单个方向整体头寸上限是10。那么现在测试一下适用于国内期货品种的新海龟策略,其最优单位头寸限制数值是否与原版海龟策略一致。

 

实现步骤:

  • 对单品种头寸上限分类,直接分成3、4、5,
  • 然后基于各自的单品种头寸上限再对当个方向的整体头寸上限进行细分,分别是8、9、10、12、14、无限大。
  • 故一共构成了3*6 = 18个测试组合。

 

在回测中修改单位头寸设置:打开海龟策略文件turtleStrategy.py找到第10、第11行代码,直接修改其数值即可。

MAX_PRODUCT_POS = 4 # 单品种最大持仓
MAX_DIRECTION_POS = 10 # 单方向最大持仓

 
 

1)分类1:单品种头寸上限等于3

基于该分类一共得到6个测试组合,其回测效果如图所示。
enter image description here
enter image description here
 

这六个备选组合,按照从左到右,从高到低顺序,其夏普比率分别是1.68、1.68、1.6、1.5、1.44、1.4。可以观察到,其数据变化呈现倒“U”型,在单个方向整体头寸上限为9时,夏普比率达到顶峰,然后不管是头寸上限增加还是降低,夏普比率都会下降。

另外,从单方向头寸从14增大至无穷大(即限制取消)时,夏普比率降低得并不严重,仅仅为0.04,这说明针对国内期货品种的海龟策略,其自然达到的单方向头寸单位是大于14的,但是其差距并不是很大。

 
 

2)分类2:单品种头寸上限等于4

然后对单品种头寸上限为4的类型进行测试,其回测效果如图所示。

enter image description here
enter image description here

这六个备选组合,按照同样的观察顺序,其夏普比率分别是1.67、1.69、1.65、1.61、1.61、1.61。其夏普比率变化同样呈现倒“U”型,但是较单位头寸上限为3的更加扁平。在单个方向整体头寸上限为9时,夏普比率达到顶峰,然后不管是头寸上限增加还是降低,夏普比率都会下降。

另外,从单方向头寸从14增大至无穷大(即限制取消)时,夏普比率降低了0.14。

 
 

3)分类3:单品种头寸上限等于5

最后对单品种头寸上限为5的类型进行测试,其回测效果如图所示。

enter image description here
enter image description here
 

可以观察到,分类3的所有组合与分类2的完全一致,这表明单品种头寸单位自然到达的上限是4,再往上增加其限制已无任何意义。

综上所述,基于海龟组合,单品种头寸单位上限为4,单个方向整体头寸为9时效果最优,其最终版本的海龟策略夏普比率达到1.69。

作者:爱茶语 ;来源:维恩的派论坛

 

1:在ctpGateway.py里面的def onRspQryTradingAccount(self, data, error, n, last):下面加上

        import csv

        import os

        import datetime
        from time import sleep

        #通过CTP接口查询账户资金

        vnTrader_dir = 'C:\ProgramData\\Anaconda2\\Lib\\site-packages\\vnpy-1.7.1-py2.7.egg\\vnpy\\trader\\app\\ctaStrategy\\AccountInfo'# AccountInfo 所在路径(这是我放CSV的路径,大家自行修改)

        today = datetime.datetime.now().strftime("%Y-%m-%d")


        # 文件名称设置为今天名称, 每次只推送一条合约信息

        path = vnTrader_dir + '\\AccountInfo_' + today + '.csv'

        if not os.path.exists(path): # 如果文件不存在,需要写header

            with open(path, 'wb') as f: # 用wb读不会产生\r\n的换行问题

                w = csv.DictWriter(f, data.keys())

                w.writeheader()

                w.writerow(data)

        else: # 文件存在,不需要写header

                with open(path,'ab') as f:  #二进制追加形式写入

                 w = csv.DictWriter(f, data.keys())

                 w.writerow(data)
                 sleep(60)

                #return    #每天只查询一次,文件存在不写入

 
 

2.在策略里面加上 """通过csv读取账户可用资金"""

import csv
import datetime
import time

    today = datetime.datetime.now().strftime("%Y-%m-%d")

    csv_file = file(r'C:\\ProgramData\\Anaconda2\\Lib\\site-packages\\vnpy-1.7.1-        py2.7.egg\\vnpy\\trader\\app\\ctaStrategy\\AccountInfo\\AccountInfo_'+today+'.csv', 'rb')

    reader = csv.DictReader(csv_file)

    Available = float([row['Available'] for row in reader ][-1])

 
 
3.在需要调仓的XBAR周期下面加上self.open_pos = int(self.Available/(self.bar.close100.15)*0.1) #螺纹钢为例,每手乘数10,保证金按15%算,10%仓位

理论上,基于日线数据的中低频趋势跟踪策略,手续费和滑点是可以忽略不计的。为了验证这个结论,进行对比测试,如图所示。

 
 

右图为包含手续费和滑点版本的海龟策略,其中手续费设置为交易所手续费的1.1倍,滑点设置为期货合约的最小价格变动。

 

其结果是总手续费为635364,总滑点为636794,加上手续费和滑点后,其影响在于:

  • 结束资金从77605953降低至76333794,
  • 年化收益从44.34%降低至44.09%,
  • 百分百最大回撤从-25.61%上升到-26.23%,
  • 夏普比率从1.65降低至1.62,
  • 资金曲线形状基本无变化。

 
综上所述,手续费和滑点对于海龟策略影响不大,在测试中可以忽略不计。

enter image description here

作者:爱茶语 ;来源:维恩的派论坛

 
 

open_pos = 20   #交易手数
    #拆单追价参数
    disassembleSingle = 10   #拆单下单量阈值
    disassembleTimeTotal = 15 #拆单总时间15秒
    disassembleInterval = 5  #拆单时间间隔5秒,拆单次数为disassembleTimeTotal/disassembleInterval
    chaseInterval = 10   #未成交追单时间间隔10秒    def __init__(self, ctaEngine, setting):
        """Constructor"""
        super(APADXTICKStrategy, self).__init__(ctaEngine, setting)
        """
        如果是多合约实例的话,变量需要放在__init__里面
        """
        # 策略变量
        self.bar = None                  # K线对象
        self.barMinute = EMPTY_STRING    # K线当前的分钟
        self.minutebar = None        # minuteK线对象
        self.bufferCount = 0                     # 目前已经缓存了的数据的计数
        self.barList = []
        self.order = {}     #委托订单号
        self.order_second = None #当前委托秒数
        self.XMhighArray = np.zeros(self.bufferSize)    # X分钟K线最高价的数组
        self.XMlowArray = np.zeros(self.bufferSize)     # X分钟K线最低价的数组
        self.XMcloseArray = np.zeros(self.bufferSize)   # X分钟K线收盘价的数组
        self.XMopenArray = np.zeros(self.bufferSize)   # X分钟K线开盘价的数组
        self.dayCloseArray = np.zeros(self.bufferSize)   # 日线收盘价的数组
        self.dayHighArray = np.zeros(self.bufferSize)   # 日线最高价的数组
        self.dayLowArray = np.zeros(self.bufferSize)   # 日线最低价的数组

        self.LongEnterable1 = False 
        self.ShortEnterable1 = False
        self.SellEnterable1 = False
        self.CoverEnterable1 = False
        self.longTriger = False
        self.shortTriger = False
        self.sellTriger = False
        self.coverTriger = False

        self.chaselongTriger = False
        self.chasesellTriger = False
        self.chaseshortTriger = False
        self.chasecoverTriger = False     

        self.orderFinished = False
        #指标初始化 
        self.lastPrice = EMPTY_INT
        self.buyPrice = EMPTY_INT
        self.shortPrice = EMPTY_INT
        self.dayClose = EMPTY_INT
        self.dayHigh = EMPTY_INT
        self.dayLow = EMPTY_INT
        self.tickSecond = EMPTY_INT
        self.sellPrice = EMPTY_INT
        self.coverPrice = EMPTY_INT
        self.adx = EMPTY_INT

        self.longtradeVolume = 0
        self.shorttradeVolume = 0
        self.selltradeVolume = 0
        self.covertradeVolume = 0
    def onInit(self):
        self.writeCtaLog('{}策略初始化'.format(self.name))

        # 载入历史数据,并采用回放计算的方式初始化策略数值
        initData = self.loadTick(self.initDays)
        for Tick in initData:
            self.onTick(Tick)

        self.putEvent()

    #----------------------------------------------------------------------
    def onStart(self):
        """启动策略(必须由用户继承实现)"""
        self.writeCtaLog('{}策略启动'.format(self.name))
        self.putEvent()

    #----------------------------------------------------------------------
    def onStop(self):
        """停止策略(必须由用户继承实现)"""
        self.writeCtaLog('{}策略停止'.format(self.name))
        self.putEvent()

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """收到行情TICK推送(必须由用户继承实现)"""
        self.tickSecond = tick.datetime.second
        self.lastPrice = tick.lastPrice
        #委托单全部成交或撤销才能发开仓单
        if not self.order:
            self.OrderFinished = True
        elif self.order['status'] == '未成交' or self.order['status'] == '部分成交':
            self.OrderFinished = False
        else:
            self.OrderFinished = True
        #拆单开仓模块
        if self.LongEnterable1 and tick.lastPrice >= self.buyPrice and self.pos >= 0:

            #如果开仓仓位大于拆单阈值disassembleSingle,分intself.disassembleTimeTotal/self.disassembleInterval次数完成开仓,间隔disassembleInterval秒
            #self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)) 开仓手数减去当前仓位小于拆单手数时只发开仓手数减去当前仓位的单子,防止总开仓手数大于self.opne_pos
            if self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) > int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
                self.longTriger =False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.longTriger) and (self.orderFinished):                      
                    self.buy(tick.bidPrice1,int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)))
                    self.longTriger =True

            elif self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
                self.longTriger =False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.longTriger) and (self.orderFinished):                      
                    self.buy(tick.bidPrice1,int(self.open_pos - abs(self.pos)))
                    self.longTriger =True 

            elif abs(self.pos) ==0 and self.open_pos <= self.disassembleSingle and (not self.longTriger) and (self.orderFinished):              
                    self.buy(tick.bidPrice1,self.open_pos)
                    self.longTriger = True              

        if self.ShortEnterable1 and tick.lastPrice <= self.shortPrice  and self.pos <= 0:
            if self.open_pos > self.disassembleSingle  and self.open_pos - abs(self.pos) > int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
                self.shortTriger =False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.shortTriger) and (self.orderFinished):
                    self.short(tick.askPrice1,int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)))
                    self.shortTriger =True

            elif self.open_pos > self.disassembleSingle and self.open_pos - abs(self.pos) < int(self.open_pos/(self.disassembleTimeTotal/self.disassembleInterval)):
                self.shortTriger =False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.shortTriger) and (self.orderFinished):
                    self.short(tick.askPrice1,int(self.open_pos - abs(self.pos)))
                    self.shortTriger =True                

            elif abs(self.pos) ==0 and self.open_pos <= self.disassembleSingle and (not self.shortTriger) and (self.orderFinished):
                    self.short(tick.askPrice1,self.open_pos)
                    self.shortTriger = True
        #拆单平仓模块
        if copy.deepcopy(self.SellEnterable1) and self.pos > 0:            
            if abs(self.pos) > self.disassembleSingle:
                self.sellTriger=False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.sellTriger) and (self.orderFinished):  
                    self.sell(tick.askPrice1,int(abs(self.pos)/(self.disassembleTimeTotal/self.disassembleInterval)))

                    self.sellTriger = True

            elif abs(self.pos) <= self.disassembleSingle and (not self.sellTriger) and (self.orderFinished): 
                    self.sell(tick.askPrice1,abs(self.pos))
                    self.sellTriger = True
        elif copy.deepcopy(self.CoverEnterable1) and self.pos < 0:
            if abs(self.pos) > self.disassembleSingle:
                self.coverTriger=False
                if self.tickSecond % self.disassembleInterval == 0 and self.tickSecond != 0 and (not self.coverTriger) and (self.orderFinished): 

                    self.cover(tick.bidPrice1,int(abs(self.pos)/(self.disassembleTimeTotal/self.disassembleInterval)))                   
                    self.coverTriger = True                                  
            elif abs(self.pos) <= self.disassembleSingle and (not self.coverTriger) and (self.orderFinished):                   
                    self.cover(tick.bidPrice1,abs(self.pos))
                    self.coverTriger = True


        #委托超过chaseInterval秒未成交撤单追价
        if not self.order:
            pass 
        elif self.order['direction'] == '多' and self.order['offset'] == '开仓' and self.order['status'] == '未成交':
            if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
                self.longtradeVolume = self.order['totalVolume']
                if self.longtradeVolume > 0 and (not self.chaselongTriger):
                    # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
                    self.cancelAll()
                    self.chaselongTriger = True



        elif self.order['direction'] == '空' and (self.order['offset'] == '平仓' or self.order['offset'] == '平今') and self.order['status'] == '未成交':
            if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
                self.selltradeVolume = self.order['totalVolume']
                if self.selltradeVolume > 0 and (not self.chasesellTriger):
                    # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
                    self.cancelAll()
                    self.chasesellTriger = True


        elif self.order['direction'] == '空' and self.order['offset'] == '开仓' and self.order['status'] == '未成交':
            if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:

                self.shorttradeVolume = self.order['totalVolume']
                if self.shorttradeVolume > 0 and (not self.chaseshortTriger):
                    # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
                    self.cancelAll()                  
                    self.chaseshortTriger = True


        elif self.order['direction'] == '多' and (self.order['offset'] == '平仓' or self.order['offset'] == '平今') and self.order['status'] == '未成交':
            if self.tickSecond % self.chaseInterval == 0 and self.tickSecond != 0:
                self.covertradeVolume = self.order['totalVolume']
                if self.covertradeVolume > 0 and (not self.chasecoverTriger):
                    # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
                    self.cancelAll()
                    self.chasecoverTriger =True
        #self.chaseInterval+1秒追价,避免追加单被撤
        if self.chaselongTriger:
            if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
                self.buy(tick.askPrice1,self.longtradeVolume)
                self.chaselongTriger = False          

        if self.chasesellTriger:
            if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
                self.sell(tick.bidPrice1,self.selltradeVolume)
                self.chasesellTriger = False                 
        if self.chaseshortTriger:
            if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
                self.short(tick.bidPrice1,self.shorttradeVolume)
                self.chaseshortTriger = False

        if self.chasecoverTriger:
            if self.tickSecond % (self.chaseInterval+1) == 0 and self.tickSecond != 0:
                self.cover(tick.askPrice1,self.covertradeVolume)
                self.chasecoverTriger = False 

        #成交触发后,触发器设置为未触发
        if self.pos == 0:
            self.CoverEnterable1 = False
            self.SellEnterable1 = False
            self.coverTriger = False
            self.sellTriger = False               
        if abs(self.pos) !=0:
            self.longTriger = False
            self.shortTriger = False

#----------------------------------------------------------------------
    def onOrder(self, order):
        """收到委托变化推送(必须由用户继承实现)"""
        self.order = order.__dict__

上一笔盈利过滤的意思是,若上一次交易为盈利的,则当前的交易信号无效,即当前不进行交易。

 

基于20日唐奇安通道,费思认为若上一次突破是盈利的,那么新突破点可能离当前价格,因为有可能跑到55日突破点上面去;若上一次突破点亏损,那么新突破点将更加接近当前价格。
 
若基于传统日内中高频CTA策略的视角,这是非常主观的解释:费思认为这几乎不可能连续2次出现大行情,但是事实是分钟级别的CTA策略有可能出现2次大行情,故首次盈利过滤的作用仅仅是节省手续费和滑点,但是错过了潜在的巨大收益。
 
但是呢,基于日线级别中低频CTA策略,其首次盈利过滤是否有效还需要进行验证。
 
本次测试的基准是剔除长周期版本的“新”海龟策略,海龟组合是在上面的章节中挑选的投资组合(回望周期为3年;筛选标准是回归夏普比率>0.6),然后把海龟策略分为含义上一笔盈利过滤和不含上一笔盈利过滤这两个版本,测试效果如图所示。
 
enter image description here
 

从上一笔盈利对比图中可以看出:

  • 不含上一笔盈利过滤版本:总成交笔数1325,年化收益41.23%,百分比最大回撤-22.32%,夏普比率达1.4。
  • 含上一笔盈利过滤版本:总成交笔数为1090,年化收益44.34%,百分比最大回撤-25.61%,夏普比率达1.5。

 

增加过滤器后,总成交笔数降低了,年化收益和夏普比率都提高了,因此可以推断海龟策略独有的上一笔盈利过滤是对于日线级别的趋势跟踪策略是非常有效的,其作用在于降低无效交易(即假突破)所造成的亏损,在整体上提高的策略的胜率。

 

(该结论在统计学来看,就是在日线级别中,连续出现2次大行情的概率极其低,应对方案就是剔除低概率事件,从而提高整体的胜率。)

 

  • 在日内中高频CTA策略中,该过滤器无效或许是Tick或者分别级别K线所包含的噪声更多,故提高连续出现2次大行情的概率;
  • 但是日线级别数据所包含的噪声更少,更能够有助于判断趋势,而趋势本身就非常难以出现连续2次的大行情。

作者:viponedream ;来源:维恩的派论坛
 
针对懒人,打算挂上去就不管它,让它自己运行,一个星期,一个月,甚至更久。
目前想到的就是几个地方要改一下,欢迎大家探讨怎样用vnpy来做隔夜。
 
 
一,定时连接CTP我采用的死方法。在uiMainWindow.py中的 updateStatusBar中,增加时间判断,到点了自动重连CTP。

# 计时器,
# 晚上21:00是夜盘时间,提前20分钟连接 CTP
dt = datetime.now()
if dt.hour == 20  or dt.hour == 8:
    if dt.minute == 40  and dt.second == 0:
        self.connectCtp()

 
 
二,历史持仓,即以前的老仓位。策略一开始要不要处理老仓位?目前是不管的,只管策略自己开的那些仓位。

这样中途停机造成问题。所以还是把老仓位一起赋给策略吧。

有人是把历史仓位存在一个文件中。我觉得没什么必要(不是单品种多策略的话)。

直接在策略中onPosition中读取历史持仓了。(要先在ctaEngine中增加一个处理函数,把onPosition推到相应的策略去)

 

def  onPosition(self, pos):
    # 更新仓位,把手动的开平仓同步更新到策略中。这样,就只能一个品种用一个策略了。
    # 有历史仓位则在策略开始时,把历史仓位赋给策略仓位。没有历史仓位则不必。

    if  self.isPrePosHaved  or self.isAlreadyTraded:         # 还没有开过仓,或,还没有获取历史仓位
        return
    elif pos.position != 0:
        if pos.direction == DIRECTION_LONG:
            self.pos = pos.position
        else:
            self.pos = -pos.position
        self.lastEntryPrice = pos.price
        self.isPrePosHaved = True

    print  (u'{0} {1}  历史持仓 {2}  开仓均价 {3}'.format(datetime.now(),  self.vtSymbol,  self.pos,  pos.price   ))
    pass

 
 

三,今仓,昨仓重置。

 

原来的系统是日内的,碰到换日就会出错。所以要重置一下。
这个要修改两个地方。
先在 CtpTdApi的 init 中加上

self.posBufferDict = {}             # 缓存持仓数据的字典
self.symbolExchangeDict = {}        # 保存合约代码和交易所的印射关系
self.symbolSizeDict = {}            # 保存合约代码和合约大小的印射关系
# 加上下面这一行
self.posBufferDictShfe = {}         # 缓存SHFE持仓数据的字典1,是在CTPGATEWAY.PY 修改这个回调。
def onRspQryInvestorPosition(self, data, error, n, last):
    """持仓查询回报"""
#print   datetime.now(),u'持仓回报', data
    # 更新持仓缓存,并获取VT系统中持仓对象的返回值
    exchange = self.symbolExchangeDict.get(data['InstrumentID'], EXCHANGE_UNKNOWN)
    size = self.symbolSizeDict.get(data['InstrumentID'], 1)

    # 获取缓存字典中的持仓缓存,若无则创建并初始化
    positionName = '.'.join([data['InstrumentID'], data['PosiDirection']])

    if exchange == EXCHANGE_SHFE:               #上期所的专门一个 dict
        if positionName in self.posBufferDictShfe:
            posBuffer = self.posBufferDictShfe[positionName]
        else:
            posBuffer = PositionBuffer(data, self.gatewayName)
            self.posBufferDictShfe[positionName] = posBuffer
    else:                                       # 其它交易所
        if positionName in self.posBufferDict:
            posBuffer = self.posBufferDict[positionName]
        else:
            posBuffer = PositionBuffer(data, self.gatewayName)
            self.posBufferDict[positionName] = posBuffer

    # 缓存持仓信息
    if exchange == EXCHANGE_SHFE:
        posBuffer.updateShfeBuffer(data, size)  # 不必 return
        if last:        # 上期所的只有最后一个才全部返回持仓信息
            for positionName in self.posBufferDictShfe:
                pos = self.posBufferDictShfe[positionName].pos
                self.gateway.onPosition(pos)
            self.posBufferDictShfe = {}      # 返回后就重置

    else:           # 其它交易所直接返回
        pos = posBuffer.updateBuffer(data, size)
        self.gateway.onPosition(pos)

 
2,在ctaEngine
增加两个事件处理函数
 

#----------------------------------------------------------------------
def registerEvent(self):
    # 加上以下两个
    self.eventEngine.register(EVENT_POSITION, self.processExchangePositionEvent)
    self.eventEngine.register(EVENT_TIMER, self.onTimer)

    def onTimer(self, event):
        dt = datetime.now()
        if dt.hour == 21 and dt.minute==0  and dt.second==1:       # 夜盘开盘第一时间重置
            self.posBufferDict = {}         # 清空则默认使用平昨

        if self.tickStrategyDict:
            for key in self.tickStrategyDict:
                for strategy in self.tickStrategyDict[key]:
                    strategy.onTimer()
        pass

    def processExchangePositionEvent(self, event):
        pos = event.dict_['data']
        if pos.vtSymbol in self.tickStrategyDict:
            for strategy in self.tickStrategyDict[pos.vtSymbol]:
                if pos.direction == DIRECTION_LONG:
                    strategy.exchangePos = pos.position
                else:
                    strategy.exchangePos = -pos.position
                strategy.onPosition(pos)

 
以上修改,策略已经可以连续跑起来了。强调的是:一个品种只能一个策略,并且只能单方向,不能同时有多和有空。# 涉及到的变量,自己加的。
self.isPrePosHaved self.isAlreadyTraded 自己加在策略上或者ctaTemplate都行。
onTimer()也自己加上的。

原版海龟策略的出入场交易信号分成2个版本:

  • 短周期版本:1)入场信号:若期货指数价格突破20日最高价/最低价,买入/卖出1个头寸单位。过滤条件是上一次突破是盈利性突破,则当前入市信号无效(其保障性突破点为在55日通道入市)。 2)离场信号:采用10日唐奇安通道突破退出法则,即多头价格跌破10日最低点离场,空头价格超过10日最高点离场。

  • 长周期版本:1)入场信号若价格突破55日最高价/最低价,买入/卖出1个头寸单位。对于长周期版本来说,所有突破都被视为有效信号,无论上一次突破是亏损性还是盈利性的。2)离场信号:采用20日唐奇安通道突破退出法则,即多头价格跌破20日最低点离场,空头价格超过10日最高点离场。

 

《海龟交易法则》的作者费思认为这个2版本的出入场信号都是有效的,海龟们可以选择其中一个版本进行交易,也可以把资金分割成2部分,一部分使用短周期信号,另一部分使用长周期信号。

 

但是这都是30年前在美国市场有效的版本,照帮到当今的国内期货市场不能够保证其有效性,故需要对其进行检验。
检验方法是把海龟策略的信号系统拆分成3个:标准版本(长周期+短周期),短周期版本和长周期版本。然后通过上一章节挑选出来的海龟组合进行验证,其效果如图所示。

 

enter image description here

 

结果显示:标准版本年化收益45.11%,百分比最大回撤-29.46%,夏普比率达1.5。长周期版本年化收益39.96%,百分比最大回撤-25.81%,夏普比率达1.44。短周期版本年化收益44.34%,百分比最大回撤-25.61%,夏普比率达1.65。因此可以推测出是长周期版本整体效果较差,拖累了短周期版本信号效果,从而拉低了整体的夏普比率。

 

海龟组合的角度来看,应该剔除长周期版本的出入场信号,仅仅保留短周期版本。

 

为了再一次验证其结论的有效性,逐个对单品种进行测试,然后统计其3个版本的夏普比率。为了表述起来更加直观,本次使用打分法,步骤如下:

  1. 根据夏普比率的测试效果对3个版本的海龟策略进行排序。
  2. 对于不同排名附加权重不同,如排名第一得1分,排名第二得0.8分,排名第三得0分。
  3. 统计3个版本海龟策略的总分
     

单品种的下面但品质长短周期的分数表,图中显示
 

  • 中金所短周期版本总分最高,为2.8分,远高于标准版本(1分)和长周期版本(0.8分),反过来说短周期版本得分最低。
  • 上期所长周期全品种组合得分最低(5分),热门品种组合也是最低(2.8分);
  • 郑商所长周期的全品种组合为5分,热门品种组合为2.8分;
  • 大商所长周期的全品种组合为4.6分,热门品种组合为1分。
     

enter image description here

 
综上所述,国内四大交易所长周期在热门品种组合和全品种组合得分均最低,故判断长周期出入场信号无效,应该剔除。新的海龟策略仅仅保留短周期信号,并且以后的验证也是基于短周期版本的海龟策略。

rqdata报错原因是版本升级(从rqdatac==1.0.0a29 升级到rqdatac==1.0.0a40),然后米筐那边修改了源代码,缺少了某些字段。

解决方案
打开文件夹“examples\DataService\RqdataDataService”下的dataService.py文件,找到第31行

rq.init(USERNAME, PASSWORD)

修改成下面的即可

rq.init(USERNAME, PASSWORD, ('rqdatad-pro.ricequant.com', 16011))
© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】