陈丹 wrote:
b.下面这段代码里面resultl_path和result_path是不是有一个手误写错了..
resultl_path = os.path.abspath( os.path.join('put your own path') ) path = pathlib.Path(resultl_path) if path.exists() and path.is_file(): df.to_csv(result_path,mode='a', header=None) else: df.to_csv(resultl_path,columns=col_keys)b. redis的python包需要低版本的才行(2.x) ,不然插入数据的时候会有类型错误问题.
我个人没有发现这个问题,不过遇到问题的话,把数据str()下就好,我现场是python 3.7 ,安装的最新redis
好贴,强烈支持,决定好好撸一帖
import os
import sys
import importlib as imp
imp.reload(sys)
ROOT_PATH = os.path.abspath(os.path.join(
os.path.dirname('__file__'), '..', '..', '..'))
sys.path.append(ROOT_PATH)
import multiprocessing
from time import time
from datetime import datetime
import random
import pandas as pd
import numpy as np
import pathlib
from deap import creator, base, tools, algorithms
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
import redis
redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=99)
from vnpy.app.cta_strategy import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.strategies.turtle_strategy_for_BTC import TurtleForBTCStrategy
from vnpy.app.cta_strategy.strategies.boll_channel_evolution_strategy import BollChannelEvolutionStrategy
from vnpy.app.cta_strategy.base import OptimizeForBacktesting
class GeneticOptimizeStrategy(object):
def __init__(self):
self.settings_ga = []
self.s = OptimizationSetting()
self.back = BacktestingEngine()
self.settings_param = {}
def backtesting_setting(self, vt_symbol, interval, start, end, rate, slippage, size, pricetick, capital, strategy, target):
self.settings_param["vt_symbol"] = vt_symbol
self.settings_param["interval"] = interval
self.settings_param["start"] = start
self.settings_param["end"] = end
self.settings_param["rate"] = rate
self.settings_param["slippage"] = slippage
self.settings_param["size"] = size
self.settings_param["pricetick"] = pricetick
self.settings_param["capital"] = capital
self.settings_param["strategy"] = strategy
self.settings_param["target"] = target
return self.settings_param
def add_parameter(self, name, start, end, step):
self.s.add_parameter(name, start, end, step)
def generate_setting_ga(self):
settings = self.s.generate_setting()
for d in settings:
param = [tuple(i) for i in d.items()]
self.settings_ga.append(param)
return self.settings_ga
def generate_parameter(self):
""""""
return random.choice(self.settings_ga)
def mutArrayGroup(self, individual, indpb):
size = len(individual)
paralist = self.generate_parameter()
for i in range(size):
if random.random() < indpb:
individual[i] = paralist[i]
return individual,
def object_func(self, strategy_avg):
""""""
return self._object_func(tuple(strategy_avg))
def optimize(self):
""""""
start = time()
toolbox = base.Toolbox()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
toolbox.register("map", pool.map)
# 初始化
toolbox.register("individual", tools.initIterate, creator.Individual, self.generate_parameter)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", self.mutArrayGroup, indpb=1)
toolbox.register("evaluate", self.object_func)
# toolbox.register("select", tools.selNSGA2)
toolbox.register("select", tools.selBest)
MU =100
LAMBDA = 80
POP = 100
pop = toolbox.population(POP)
CXPB, MUTPB, NGEN = 0.95, 0.05, 30
hof = tools.ParetoFront()
stats = tools.Statistics(lambda ind: ind.fitness.values)
np.set_printoptions(suppress=True)
stats.register("mean", np.mean, axis=0)
stats.register("std", np.std, axis=0)
stats.register("min", np.min, axis=0)
stats.register("max", np.max, axis=0)
print("开始运行遗传算法,每代族群总数:%s, 优良品种筛选个数:%s,迭代次数:%s,交叉概率:%s,突变概率:%s" % (POP, MU, NGEN, CXPB, MUTPB))
algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof, verbose=True)
pool.close()
end = time()
cost = int((end - start))
print("遗传算法优化完成,耗时%s秒" % (cost))
print("----------输出帕累托前沿解集,解集数量%s----------" % (len(hof)))
# return hof
for i in range(len(hof)):
solution = hof[i]
print(solution)
def _object_func(self, strategy_avg):
#######################################################
# redis 查询
conn = redis.Redis(connection_pool=redis_pool)
target = conn.get(str(strategy_avg))
if target is not None:
print(f'redis读取成功!!!')
return float(target),
#######################################################
engine = self.back
engine.set_parameters(
vt_symbol=self.settings_param["vt_symbol"],
interval=self.settings_param["interval"],
start=self.settings_param["start"],
end=self.settings_param["end"],
rate=self.settings_param["rate"],
slippage=self.settings_param["slippage"],
size=self.settings_param["size"],
pricetick=self.settings_param["pricetick"],
capital=self.settings_param["capital"],
optimize_mem=self.settings_param["optimize_mem"]
)
setting = dict(strategy_avg)
engine.add_strategy(self.settings_param["strategy"], setting)
engine.load_data()
engine.run_backtesting()
engine.calculate_result()
result = engine.calculate_statistics(output=False)
target = round(result[self.settings_param["target"]], 5)
#######################################################
# 插入 redis
conn.setnx(str(strategy_avg), target) # 存在则不增加
print('redis写入成功')
#######################################################
return target,
def get_result_from_redis(self):
conn = redis.Redis(connection_pool=redis_pool)
df = pd.DataFrame() # redis全部读入df
keys = conn.keys()
redis_keys = [tuple(eval(k)) for k in keys]
col_keys = [ parameter[0] for parameter in redis_keys[0]]
# 存入parameter进入df
for key,i in zip(redis_keys[0],range(len(redis_keys[0]))):
df[key[0]] = np.array([k[i][1] for k in redis_keys])
# 存result
result_temp = conn.mget(keys)
df['result'] = np.array([float(_.decode(encoding='utf8')) for _ in result_temp])
col_keys.append('result')
# 存入cdv
file_name = '_'.join([
str(self.settings_param["strategy"]),
str(self.settings_param["start"]),
str(self.settings_param["end"]),
'.csv'])
resultl_path = os.path.abspath(
os.path.join('put your own path')
)
path = pathlib.Path(resultl_path)
if path.exists() and path.is_file():
df.to_csv(result_path,mode='a', header=None)
else:
df.to_csv(resultl_path,columns=col_keys)
# conn.flushall() 看情况是否需要清空redis
def main():
GE = GeneticOptimizeStrategy()
GE.backtesting_setting(
vt_symbol="BTC/USD.BITFINEX",
interval="1m",
start=datetime(2018, 8, 11),
end=datetime(2019, 12, 31),
rate=0.2 / 100,
slippage=0.3,
size=3,
pricetick=0.1,
capital=1_000_000,
strategy=BollChannelEvolutionStrategy,
target="sharpe_ratio",
)
GE.add_parameter('boll_window', 50, 780,10)
GE.add_parameter('boll_dev', 1.5, 3.0, 0.2)
GE.add_parameter('cci_window', 100, 300, 10)
GE.add_parameter('atr_window', 200, 600, 10)
GE.add_parameter('sl_multiplier', 0.1, 3.0, 0.2)
GE.generate_setting_ga()
GE.generate_parameter()
GE.optimize()
GE.get_result_from_redis()
if __name__ == "__main__":
main()
请问最后搞定了吗?我试了下貌似不行
回测是怎么用的哇,回测没有办法推送account事件啊
有结论吗关于这个问题,我按照要求设置了还是没有日志文件啊
在相应的ui里改widget.py文件设置,只能一个个自己改了