VeighNa量化社区
你的开源社区量化交易平台
hxxjava's Avatar
Member
离线
419 帖子
声望: 155

1. 为什么要修改?

原因见这个帖子:https://www.vnpy.com/forum/topic/4461-shuo-shi-r-breakerce-lue-de-wen-ti

2. 修改步骤:

2.1 添加文件vnpy\usertools\trade_hour.py

内容如下:

"""
本文件主要实现合约的交易时间段
作者:hxxjava
日期:2020-8-1
"""
from typing import Callable,List,Dict, Tuple, Union
from enum import Enum

import datetime
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")

from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.constant import Interval

from rqdatac.utils import to_date
import rqdatac as rq


def get_listed_date(symbol:str):
    ''' 
    获得上市日期 
    '''
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

def get_de_listed_date(symbol:str):
    ''' 
    获得交割日期 
    '''
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)

class Timeunit(Enum):
    """ 
    时间单位 
    """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'

class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol.upper()
        self.init()

    def init(self):
        """ 
        初始化交易日字典及交易时间段数据列表 
        """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)

        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典

        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1

        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')

        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)

        trading_hours0 = [(CHINA_TZ.localize(start),CHINA_TZ.localize(stop)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1:相对交易日前推天数,
                #stop :开始时间,dn2:相对开始时间后推天数     
                d = self.trade_index_date[day+dn1]
                start_dt = CHINA_TZ.localize(datetime.datetime.combine(d,start))
                stop_dt = CHINA_TZ.localize(datetime.datetime.combine(d,stop))
                trade_datetimes.append((start_dt,stop_dt+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)

    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]): 
        """ 
        交易时间跨天处理,不推荐外部使用 。
        产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中:
            startN:开始时间,dn1N:相对交易日前推天数,
            stopN:开始时间,dn2N:相对开始时间后推天数      
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))

        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop

        return time_dn_pairs

    def get_date_tradetimes(self,date:datetime.date):
        """ 
        得到合约date日期的交易时间段 
        """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times

    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 
        得到合约date日期的交易时间段 
        """
        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]

        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到,那这就不是一个有效的交易时间
        if days is None:
            return (None,[])

        index_3 = [days,days+1,days-1]  # 前后三天的

        date_3d = []
        for day in index_3: 
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)

        # print(date_3d)

        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue

            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue

            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]

        return None,[]

    def get_trade_time_perday(self):
        """ 
        计算每日的交易总时长(单位:分钟) 
        """
        TTPD = datetime.timedelta(0,0,0)

        datetimes = []
        today = datetime.datetime.now().date()

        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = CHINA_TZ.localize(datetime.datetime.combine(today,start)) + datetime.timedelta(days=dn1)
            stop_dt = CHINA_TZ.localize(datetime.datetime.combine(today,stop)) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)

    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """ 
        计算dt在交易日内的分钟数 
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None

        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta     
                break
            else:
                break          

        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60) 
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600) 
        else:
            return TTID

    def get_day_tradetimes(self,dt:datetime):
        """ 
        得到合约日盘的交易时间段 
        """
        index,trade_times = self.get_trade_datetimes(dt,allday=True)
        trade_times1 = []
        if trade_times:
            for start_dt,stop_dt in trade_times:
                if start_dt.time() < datetime.time(18,0,0):
                    trade_times1.append((start_dt,stop_dt))
            return index,trade_times1
        return (index,trade_times1)

    def get_night_tradetimes(self,dt:datetime):
        """ 
        得到合约夜盘的交易时间段 
        """
        index,trade_times = self.get_trade_datetimes(dt,allday=True)
        trade_times1 = []
        if trade_times:
            for start_dt,stop_dt in trade_times:
                if start_dt.time() > datetime.time(18,0,0):
                    trade_times1.append((start_dt,stop_dt))
            return index,trade_times1
        return (index,trade_times1)

    def convet_to_datetime(self,day:int,minutes:int):
        """ 
        计算minutes在第day交易日内的datetime形式的时间 
        """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start 
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None

    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 
        计算dt所在K线的起止时间 
        """
        bar_windows = (None,None)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows

        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()

        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)

        # 得到dt时刻K线的起止时间 
        total_minites = day*TTPD + TTID

        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows

        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width

        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")

        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")

        return start_dt,stop_dt

    def get_date_start_stop(self,dt:datetime):
        """
        获得dt所在交易日的开始和停止时间
        """
        index,trade_times = self.get_trade_datetimes(dt,allday=True)
        if trade_times:
            valid_dt = False
            for t1,t2 in trade_times:
                if t1 < dt and dt < t2:
                    valid_dt = True
                    break
            if valid_dt:
                start_dt = trade_times[0][0]
                stop_dt = trade_times[-1][1]
                return True,(start_dt,stop_dt)
        return False,(None,None)

    def get_day_start_stop(self,dt:datetime):
        """
        获得dt所在交易日日盘的开始和停止时间
        """
        index,trade_times = self.get_day_tradetimes(dt)
        if trade_times:
            valid_dt = False
            for t1,t2 in trade_times:
                if t1 < dt and dt < t2:
                    valid_dt = True
                    break
            if valid_dt:
                start_dt = trade_times[0][0]
                stop_dt = trade_times[-1][1]
                return True,(start_dt,stop_dt)
        return False,(None,None)

    def get_night_start_stop(self,dt:datetime):
        """
        获得dt所在交易日夜盘的开始和停止时间
        """
        index,trade_times = self.get_night_tradetimes(dt)
        if trade_times:
            valid_dt = False
            for t1,t2 in trade_times:
                if t1 < dt and dt < t2:
                    valid_dt = True
                    break
            if valid_dt:
                start_dt = trade_times[0][0]
                stop_dt = trade_times[-1][1]
                return True,(start_dt,stop_dt)
        return False,(None,None)


if __name__ == "__main__":
    rq.init('xxxxx','******',("rqdatad-pro.ricequant.com",16011))

    # vt_symbols = ["rb2010.SHFE","ag2012.SHFE","i2010.DCE"]
    vt_symbols = ["ag2012.SHFE"]
    date0 = datetime.date(2020,8,31)
    dt0 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
    for vt_symbol in vt_symbols:
        symbol,exchange = extract_vt_symbol(vt_symbol)
        th = TradeHours(symbol)
        # trade_hours = th.get_date_tradetimes(date0)
        # print(f"\n{vt_symbol} {date0} trade_hours={trade_hours}")

        days,trade_hours = th.get_trade_datetimes(dt0,allday=True)

        print(f"\n{vt_symbol} {dt0} days:{days} trade_hours={trade_hours}")

        if trade_hours:
            day_start = trade_hours[0][0]
            day_end = trade_hours[-1][1]
            print(f"day_start={day_start} day_end={day_end}")
            exit_time = day_end + datetime.timedelta(minutes=-5)
            print(f"exit_time={exit_time}")

        dt1 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
        dt2 = CHINA_TZ.localize(datetime.datetime(2020,9,1,1,1,15))

        for dt in [dt1,dt2]:
            in_trade,(start,stop) = th.get_date_start_stop(dt)
            if (in_trade):
                print(f"\n{vt_symbol} 时间 {dt} 交易日起止:{start,stop}")
            else:
                print(f"\n{vt_symbol} 时间 {dt} 非交易时间")

            in_day,(start,stop) = th.get_day_start_stop(dt)
            if (in_day):
                print(f"\n{vt_symbol} 时间 {dt} 日盘起止:{start,stop}")
            else:
                print(f"\n{vt_symbol} 时间 {dt} 非日盘时间")

            in_night,(start,stop) = th.get_night_start_stop(dt)
            if in_night:
                print(f"\n{vt_symbol} 时间 {dt} 夜盘起止:{start,stop}")
            else:
                print(f"\n{vt_symbol} 时间 {dt} 非夜盘时间")

2.2 修改策略文件 RBreakerStrategy.py

代码如下:

from datetime import datetime,time,timedelta
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)

from vnpy.trader.utility import extract_vt_symbol
from vnpy.usertools.trade_hour import TradeHours

class RBreakStrategy2(CtaTemplate):
    """"""
    author = "KeKe"

    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30

    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3

    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价

    intra_trade_high = 0
    intra_trade_low = 0

    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0

    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(RBreakStrategy2, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )

        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []

        symbol,exchange = vt_symbol.split('.')
        self.trade_hour = TradeHours(symbol)
        self.trade_datetimes = None
        self.exit_time = None

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def is_new_day(self,dt:datetime):
        """
        判断dt时间是否在当天的交易时间段内
        """
        if not self.trade_datetimes: 
            return True
        day_start = self.trade_datetimes[0][0]
        day_end = self.trade_datetimes[-1][1]
        if day_start<=dt and dt < day_end:
            return False
        return True

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        # 判断是否是下一交易日
        self.new_day = self.is_new_day(bar.datetime)
        if self.new_day:
            # 计算下一交易日的交易时间段
            days,self.trade_datetimes = self.trade_hour.get_trade_datetimes(bar.datetime,allday=True) 

            # 计算退出时间
            # print(f"trade_datetimes={self.trade_datetimes}")
            if self.trade_datetimes:
                day_end = self.trade_datetimes[-1][1]
                self.exit_time = day_end + timedelta(minutes=-5)

        if not self.trade_datetimes:
            # 不是个有效的K线,不可以处理,
            # 为什么会有K线推送?因为非交易时段接口的行为是不可理喻的
            return

        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]

        # New Day
        if self.new_day:    # 如果是新交易日
            if self.day_open:
                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价

                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价

                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价

            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price

        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price

        if not self.sell_setup:
            return

        self.tend_high, self.tend_low = am.donchian(self.donchian_window)

        if bar.datetime < self.exit_time:

            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price

                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)

                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)

                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)

            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)

        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))

        self.put_event()

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass

1. 先看看R-Breaker策略的init():

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(DynaRBreakStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )

        self.bg = BarGenerator(self.on_bar)   # 这是1分钟K线生成器
        self.am = ArrayManager()
        self.bars = []

可以看出它的self.bg是1分钟K线的产生器。

2. 再看看R-Breaker策略的on_bar()代码:

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]

        # New Day
        if last_bar.datetime.date() != bar.datetime.date():       # 这样可能只可以判断夜盘和日盘
            if self.day_open:

                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价

                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价

                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价

            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price

        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price

        if not self.sell_setup:
            return

        self.tend_high, self.tend_low = am.donchian(self.donchian_window)

        if bar.datetime.time() < self.exit_time:   # self.exit_time==14:55,----》0:00~14:55是可以下单的

            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price

                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)

                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)

                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)

            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)

        # Close existing position
        else:      # 14:55平仓,夜盘是不做的吗???
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))

        self.put_event()

3. 这样判断是否是跨日有点不对劲:

# New Day
if last_bar.datetime.date() != bar.datetime.date():

我们知道很多期货品种都可能有夜盘,而且日K线的起始时间是上一交易日的晚上21:00,这就不用多说了。也就是说连续的2个1分钟bar是没有办法判断跨交易日的。
如果这样可能判断错误,那么这几个变量:
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
就有点名不副实了。

4. 不做夜盘是否是策略设计的本意???

目前的这个R-Breaker又不是完全没有夜盘,因为 0:00~14:55 可以下单的,14:55平仓,夜盘是不做的,是有什么说法吗?
要知道象白银之类的合约的夜盘是21:00-2:30,从21:00到次日00:00有3小时之多呢,大概1/3的时间被剔除出可以交易时间,是什么原因呢?

5. 怎么能够做夜盘 ?

可以考虑合约的交易时间段来判断交易日的开始和结束,陈老师觉得是否有必要呢?这是我的一点点意见,不知道是否正确,望批评指正!

1. 问题:显示2组相同的成交单

发生条件:登录vnpy后,运行策略,连续2天没有重新启动。

description

2. 解决方法:

修改vnpy\trader\ui\widget.py中的TradeMonitor:

class TradeMonitor(BaseMonitor):
    """
    Monitor for trade data.
    """

    event_type = EVENT_TRADE
    data_key = "tradeid"    # hxxjava chanage
    sorting = True

    headers: Dict[str, dict] = {
        "tradeid": {"display": "成交号 ", "cell": BaseCell, "update": False},
        "orderid": {"display": "委托号", "cell": BaseCell, "update": False},
        "symbol": {"display": "代码", "cell": BaseCell, "update": False},
        "exchange": {"display": "交易所", "cell": EnumCell, "update": False},
        "direction": {"display": "方向", "cell": DirectionCell, "update": False},
        "offset": {"display": "开平", "cell": EnumCell, "update": False},
        "price": {"display": "价格", "cell": BaseCell, "update": False},
        "volume": {"display": "数量", "cell": BaseCell, "update": False},
        "datetime": {"display": "时间", "cell": TimeCell, "update": False},
        "gateway_name": {"display": "接口", "cell": BaseCell, "update": False},
    }

1. 合约信息中包含保证金率

1.1 合约信息查询命令:

ReqQryInstrument : 请求查询合约,填空可以查询到所有合约。
响应:OnRspQryInstrument
◇ 1.函数原型
virtual int ReqQryInstrument(CThostFtdcQryInstrumentField *pQryInstrument, int nRequestID) = 0;
◇ 2.参数
pQryInstrument:查询合约
struct CThostFtdcQryInstrumentField
{
    TThostFtdcInstrumentIDType InstrumentID; ///合约代码
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcExchangeInstIDType ExchangeInstID; ///合约在交易所的代码
    TThostFtdcInstrumentIDType ProductID;///产品代码
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

1.2 合约信息查询结果:

请求查询合约响应,当执行ReqQryInstrument后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrument(CThostFtdcInstrumentField *pInstrument, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrument:
合约
struct CThostFtdcInstrumentField
{
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcInstrumentNameType InstrumentName; ///合约名称
    TThostFtdcExchangeInstIDType ExchangeInstID;///合约在交易所的代码
    TThostFtdcInstrumentIDType ProductID; ///产品代码
    TThostFtdcProductClassType ProductClass; ///产品类型
    TThostFtdcYearType DeliveryYear; ///交割年份
    TThostFtdcMonthType DeliveryMonth;///交割月
    TThostFtdcVolumeType MaxMarketOrderVolume; ///市价单最大下单量
    TThostFtdcVolumeType MinMarketOrderVolume;///市价单最小下单量
    TThostFtdcVolumeType MaxLimitOrderVolume; ///限价单最大下单量
    TThostFtdcVolumeType MinLimitOrderVolume; ///限价单最小下单量
    TThostFtdcVolumeMultipleType VolumeMultiple; ///合约数量乘数
    TThostFtdcPriceType PriceTick; ///最小变动价位
    TThostFtdcDateType CreateDate; ///创建日
    TThostFtdcDateType OpenDate; ///上市日
    TThostFtdcDateType ExpireDate;///到期日
    TThostFtdcDateType StartDelivDate; ///开始交割日
    TThostFtdcDateType EndDelivDate; ///结束交割日
    TThostFtdcInstLifePhaseType InstLifePhase; ///合约生命周期状态
    TThostFtdcBoolType IsTrading;///当前是否交易
    TThostFtdcPositionTypeType PositionType; ///持仓类型
    TThostFtdcPositionDateTypeType PositionDateType;///持仓日期类型
    TThostFtdcRatioType LongMarginRatio;///多头保证金率
    TThostFtdcRatioType ShortMarginRatio; ///空头保证金率
    TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;///是否使用大额单边保证金算法
    TThostFtdcInstrumentIDType UnderlyingInstrID;///基础商品代码
    TThostFtdcPriceType StrikePrice;///执行价
    TThostFtdcOptionsTypeType OptionsType;///期权类型
    TThostFtdcUnderlyingMultipleType UnderlyingMultiple; ///合约基础商品乘数
    TThostFtdcCombinationTypeType CombinationType;///组合类型
};
VolumeMultiple:合约乘数(同交易所)
PriceTick:最小变动价位(同交易所)
IsTrading:是否活跃(同交易所)
DeliveryYear:交割年份(同交易所)
DeliveryMonth:交割月(同交易所)
OpenDate:上市日(同交易所)
CreateDate:创建日(同交易所)
ExpireDate:到期日(同交易所)
StartDeliveDate:开始交割日(同交易所)
EndDelivDate:结束交割日(同交易所)

同交易所表示这些字段每天更新自交易所,其余字段为柜台设置值。如果发现有些字段值有误,则以此来判断是交易所问题还是CTP柜台设置问题。
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID; ///错误代码
    TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

2. 保证金率查询结果中包含保证金

2.1 保证金率查询命令

ReqQryInstrumentMarginRate
请求查询合约保证金率,对应响应OnRspQryInstrumentMarginRate。如果InstrumentID填空,则返回持仓对应的合约保证金率,否则返回相应InstrumentID的保证金率。
目前无法通过一次查询得到所有合约保证金率,如果要查询所有,则需要通过多次查询得到。

◇ 1.函数原型
virtual int ReqQryInstrumentMarginRate(CThostFtdcQryInstrumentMarginRateField *pQryInstrumentMarginRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentMarginRate:
查询合约保证金率
struct CThostFtdcQryInstrumentMarginRateField
{
    ///经纪公司代码
    TThostFtdcBrokerIDType BrokerID;
    ///投资者代码
    TThostFtdcInvestorIDType InvestorID;
    ///合约代码
    TThostFtdcInstrumentIDType InstrumentID;
    ///投机套保标志
    TThostFtdcHedgeFlagType HedgeFlag;
    ///交易所代码
    TThostFtdcExchangeIDType ExchangeID;
    ///投资单元代码
    TThostFtdcInvestUnitIDType InvestUnitID;
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

2.2 保证金率查询结果

OnRspQryInstrumentMarginRate
请求查询合约保证金率响应,当执行ReqQryInstrumentMarginRate后,该方法被调用。

◇ 1.函数原型
virtual void OnRspQryInstrumentMarginRate(CThostFtdcInstrumentMarginRateField *pInstrumentMarginRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};

◇ 2.参数    ///:
合约保证金率
struct CThostFtdcInstrumentMarginRateField
{
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcInvestorRangeType InvestorRange;///投资者范围
    TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
    TThostFtdcInvestorIDType InvestorID;///投资者代码
    TThostFtdcHedgeFlagType HedgeFlag; ///投机套保标志
    TThostFtdcRatioType LongMarginRatioByMoney;///多头保证金率
    TThostFtdcMoneyType LongMarginRatioByVolume;///多头保证金费
    TThostFtdcRatioType ShortMarginRatioByMoney; ///空头保证金率
    TThostFtdcMoneyType ShortMarginRatioByVolume; ///空头保证金费
    TThostFtdcBoolType IsRelative;///是否相对交易所收取
    TThostFtdcExchangeIDType ExchangeID;///交易所代码
    TThostFtdcInvestUnitIDType InvestUnitID; ///投资单元代码
};
pRspInfo:响应信息

struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID;///错误代码
    TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。

bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

3. 手续费(率)查询结果中包含手续费

3.1 手续费(率)查询命令

ReqQryInstrumentCommissionRate
请求查询合约手续费率,对应响应OnRspQryInstrumentCommissionRate。如果InstrumentID填空,则返回持仓对应的合约手续费率。
目前无法通过一次查询得到所有合约手续费率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentCommissionRate(CThostFtdcQryInstrumentCommissionRateField *pQryInstrumentCommissionRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentCommissionRate:
查询手续费率
struct CThostFtdcQryInstrumentCommissionRateField
{
    TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
    TThostFtdcInvestorIDType InvestorID;///投资者代码
    TThostFtdcInstrumentIDType InstrumentID;///合约代码
    TThostFtdcExchangeIDType ExchangeID;///交易所代码
    TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};

InstrumentID:返回手续费率对应的合约。
但是如果在柜台没有设置具体合约的手续费率,则默认会返回产品的手续费率,InstrumentID就为对应产品ID。
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。

3.3 手续费(率)查询结果

OnRspQryInstrumentCommissionRate
请求查询合约手续费率响应,当执行ReqQryInstrumentCommissionRate后,该方法被调用。

◇ 1.函数原型
virtual void OnRspQryInstrumentCommissionRate(CThostFtdcInstrumentCommissionRateField *pInstrumentCommissionRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrumentCommissionRate:合约手续费率
struct CThostFtdcInstrumentCommissionRateField
{
    TThostFtdcInstrumentIDType InstrumentID; ///合约代码
    TThostFtdcInvestorRangeType InvestorRange; ///投资者范围
    TThostFtdcBrokerIDType BrokerID;///经纪公司代码
    TThostFtdcInvestorIDType InvestorID; ///投资者代码
    TThostFtdcRatioType OpenRatioByMoney; ///开仓手续费率
    TThostFtdcRatioType OpenRatioByVolume; ///开仓手续费
    TThostFtdcRatioType CloseRatioByMoney;///平仓手续费率
    TThostFtdcRatioType CloseRatioByVolume;///平仓手续费
    TThostFtdcRatioType CloseTodayRatioByMoney;///平今手续费率
    TThostFtdcRatioType CloseTodayRatioByVolume;///平今手续费
    TThostFtdcExchangeIDType ExchangeID; ///交易所代码
    TThostFtdcBizTypeType BizType;///业务类型    
    TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};

pRspInfo:
响应信息
struct CThostFtdcRspInfoField
{
    TThostFtdcErrorIDType ErrorID; ///错误代码
    TThostFtdcErrorMsgType ErrorMsg; ///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。

bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。

4. 合约+保证金率+手续费(率)= 完整的合约参数

令:
合约查询结果 = C
保证金率查询结果 = M
手续费查询结果 = S
则:

合约乘数:

C["VolumeMultiple"]

保证金率:

if M["Is_Relative"] == 1:
    多头保证金率 = C["LongMarginRatio"] + M["LongMarginRatioByMoney"] 
    空头保证金率 = C["ShortMarginRatio"] + M["ShortMarginRatioByMoney"] 

else:
多头保证金率 = M["LongMarginRatioByMoney"]
空头保证金率 = M["ShortMarginRatioByMoney"]

手续费(率):

        if S.open_ratio_bymoney == 0.0:
            开仓手续费= [FeeType.LOT,S["OpenRatioByVolume"] ]
            平仓手续费= [FeeType.LOT,S["CloseRatioByVolume"] ]
            平今手续费= [FeeType.LOT,S["CloseTodayRatioByVolume"] ]
        else:
            开仓手续费 = [FeeType.RATE,S["OpenRatioByMoney"] ]
            平仓手续费 = [FeeType.RATE,S["CloseRatioByMoney"] ]
            平今手续费 = [FeeType.RATE,S["CloseTodayRatioByMoney"] ]      

1、TdApi.reqQryInstrument()命令的返回

在回调函数TdApi.onQryInstrument()得到的返回结果是这样的:
以rb2010为例:

{
    'InstrumentID': 'rb2010', 
    'ExchangeID': 'SHFE', 
    'InstrumentName': '螺纹钢2010', 
    'ExchangeInstID': 'rb2010', 
    'ProductID': 'rb', 
    'ProductClass': '1', 
    'DeliveryYear': 2020, 
    'DeliveryMonth': 10, 
    'MaxMarketOrderVolume': 30, 
    'MinMarketOrderVolume': 1, 
    'MaxLimitOrderVolume': 500, 
    'MinLimitOrderVolume': 1, 
    'VolumeMultiple': 10, 
    'PriceTick': 1.0, 
    'CreateDate': '20190912', 
    'OpenDate': '20191016', 
    'ExpireDate': '20201015', 
    'StartDelivDate': '20201016', 
    'EndDelivDate': '20201022', 
    'InstLifePhase': '1', 
    'IsTrading': 1, 
    'PositionType': '2', 
    'PositionDateType': '1', 
    'LongMarginRatio': 0.1, 
    'ShortMarginRatio': 0.1, 
    'MaxMarginSideAlgorithm': '1', 
    'UnderlyingInstrID': '', 
    'StrikePrice': 0.0, 
    'OptionsType': '\x00', 
    'UnderlyingMultiple': 0.0, 
    'CombinationType': '0'
}

2、TdApi.reqQryInstrumentMarginRate()命令的返回

在回调函数TdApi.onQryInstrumentMarginRate()中得到返回结果这样是的:
以rb2010为例:

{
    'InstrumentID' : rb2010,
    'InvestorRange' : 1,
    'BrokerID' : 9999,
    'InvestorID' : 147102,
    'HedgeFlag' : 1,
    'LongMarginRatioByMoney' : 0.1,
    'LongMarginRatioByVolume' : 0.0,
    'ShortMarginRatioByMoney' : 0.1,
    'ShortMarginRatioByVolume' : 0.0,
    'IsRelative' : 0,
    'ExchangeID' : ,
    'InvestUnitID' : 
}

3、两个返回值都有关于保证金的字段

TdApi.reqQryInstrument()命令的返回中包括:

  • LongMarginRatio :多头保证金率
  • ShortMarginRatio:空头保证金率

TdApi.reqQryInstrumentMarginRate()命令的返回包括:

  • LongMarginRatioByMoney :多头保证金率
  • LongMarginRatioByVolume :多头保证金费
  • ShortMarginRatioByMoney :空头保证金率
  • ShortMarginRatioByVolume:空头保证金费
  • IsRelative:是否相对交易所收取

4、问题

1 这两个命令的返回值都包含中合约的保证金率,哪个是期货公司的实收的保证金率 ?
2 两条都是TdApi的命令,都是连接开户的期货公司的交易服务器的,还有必要执行TdApi.reqQryInstrumentMarginRate()专门获取吗 ?

交易所会公布保证金(率)和手续费(率)

  • 可是这是对它的会员有意义,比如:期货公司,大型金融机构。
  • 一般个人不可以直接在交易所开户

你所在的期货公司会对你另外加收保证金(率)和手续费(率)

  • 你所在的期货公司,会根据你的资金实力、操作水平另外加收保证金(为了防止风险),加收手续费(期货公司收入),这是可以理解的
  • 可是这些保证金比例和手续费通常是会调整的,而且可能是大幅度地调整,比如:rb2010有可能昨天是8%,今天就是15%,昨天还可以开仓20手钱,今天你就资金不足啦!

问题:哪家期货公司可以通过接口获取合约的实收保证金率和手续费(率)?

  • 怎么从你开户的期货公司服务器的CTP接口,拿到你自己的保证金比例和手续费?
  • 最好是通过接口编程的方法解决,手工解决的也行,有经验的分享下,先谢谢!!!

VNPY中的BarGenerator的构造函数是这样的:

class BarGenerator:
    ......
    def __init__(self,on_bar: Callable,window: int = 0,on_window_bar: Callable = None,interval: Interval = Interval.MINUTE ):
    ... ...

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # If not inited, creaate window bar object
        if not self.window_bar:
            # Generate timestamp for bar data
            if self.interval == Interval.MINUTE:
                dt = bar.datetime.replace(second=0, microsecond=0)
            else:
                dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        finished = False

        if self.interval == Interval.MINUTE:
            # x-minute bar
            if not (bar.datetime.minute + 1) % self.window:
                finished = True
        elif self.interval == Interval.HOUR:
            if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
                # 1-hour bar
                if self.window == 1:
                    finished = True
                # x-hour bar
                else:
                    self.interval_count += 1

                    if not self.interval_count % self.window:
                        finished = True
                        self.interval_count = 0

        if finished:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

        # Cache last bar object
        self.last_bar = bar

而Interval类型是这样的:

class Interval(Enum):
    """
    Interval of bar data.
    """
    MINUTE = "1m"
    HOUR = "1h"
    DAILY = "d"
    WEEKLY = "w"

结论:

BarGenerator虽然传入的Interval类型,但是它只考虑的Interval.MINUTE和Interval.HOUR两个单位,而它合成N分钟和N小时的K线是没有问题的。
也就是说,你不可以是Interval.DAILY和Interval.WEEKLY做单位,因为它使用的米筐接口的1分钟历史数据,没有使用米筐的1h和1d数据。

因此你不可以这么创建月K线

1) self.bgm = BarGenerator(self.on_bar, 4, self.on_month_bar,interval=Interval.WEEKLY)
2) self.bgm = BarGenerator(self.on_bar,20, self.on_month_bar,interval=Interval.DAILY)
因为BarGenerator没有考虑 Interval.DAILY和Interval.WEEKLY时间间隔

创建日线以上周期K线必须考虑合约交易时间段:

交易时间段决定日K线小时数

使用Interval.MINUTE作为参数时,window不可以超过59,它表示合成不了成功1小时的K线,而Interval.HOUR作为参数时,是对1小时K线进行计数,然后把的self.interval_count % self.window作为条件来判断是否查询window小时K线是否结束的,它可以用来表达日线以上周期K线。
所以创建日线以上周期K线,你最大只可以使用Interval.HOUR为单位,而且它又是参考自然时间的生成机制:

举例:
rb2010的交易时间段 :21:00-23:30(4根小时K线)09:00-10:15 10:30-11:30(3根小时K线) 13:30-15:00(2根小时K线),因此需要9根1小时K线合成
ag2012的交易时间段 :21:00-02:30 (6根小时K线)09:00-10:15 10:30-11:30 (3根小时K线)13:30-15:00(2根小时K线),因此需要11根1小时K线合成
IF88 沪深主力连续 交易时间段:09:30-11:30(3根小时K线),13:00-15:00(2根小时K线),每日时长:5小时,因此需要6根1小时K线合成
T2009 10年期国债2009 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成
TS2103 2年期国债2103 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成

如果看不明白上面的叙述,就静下心来慢慢想一些吧,想不明白就看看BarGenerator的update_bar()函数代码就明白了。

下面仅以rb2010和ag2012合约为例来说明,其他周期的类似。

日K线产生器

rb2010的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 9, self.on_day_bar,interval=Interval.HOUR)
ag2012的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 11, self.on_day_bar,interval=Interval.HOUR)

5日K线合成周K线

rb2010的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 45, self.on_week_bar,interval=Interval.HOUR)
ag2012的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 55, self.on_week_bar,interval=Interval.HOUR)

4周K线合成月K线

rb2010的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 180, self.on_month_bar,interval=Interval.HOUR)
ag2012的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 220, self.on_month_bar,interval=Interval.HOUR)

注意:

这些合成的日K能够保证是日内对齐的,但周K和月K线并不能够保证是周内和月内对齐的,它取决你什么时候启动你的策略。

更好的创建方法:

更好的创建方法先对BarGenerator进行扩展,实现考虑交易时间段的日K、周K的生成机制,当然创建时需要传入交易时间段参数。这里就不在说了,以后可以专门讨论。

VNPY的源代码目录众多,分门别类非常多,许多文件的内容都很类似,甚至函数名称都是一样的,如果不做特别设置,查询代码会把一些你不使用的app、api、gateway下的文件都查询出来,很是烦人,经过本人琢磨,发现这么设置就不错:

创建VNPY工作区

随便怎么命名都可以,我就为其取名VNPY,如下图所示:

1 把vnpy的源代码目录加入工作区
2 把自己的策略源代码目录加入工作区
description

查询代码设置步骤

  • 1 点击查找图标;
  • 2 输入查找字符串;
  • 3 设置需要包含的目录或者文件,只输入文件夹名称会包含下面的文件和子文件夹下的文件。需要输入多个文件夹时,用逗号割开;
  • 4 设置需要排除的文件,与步骤3类似,可以使用通配符,如图所示:

经过这样设置以后,当你输入任何要查找字符串的时候,就不会显示那些你不感兴趣的app、gateway下的子目录的文件了。本人目前感兴趣的是trader,event,chart,rpc,app\cta_strategy,app\rpc_service,gateway\ctp,api,user_tools这些子目录中的文件,你也可以根据自己的需要特别设定,以提高代码查询的效率。
description

1 几乎所有例子的仓位都是self.fixed_size=1

进阶课程里看过陈晓优老师讲的很多策略例子,都是讲如何买和如何卖,如何止盈和止损的,又是仿真、优化、实盘的,看到人是心潮澎湃!
于是看是着手仿照例子编写自己的策略,策略经过计算买卖信号后,总是要下单的,那我下多大仓位呢? 回过头这时候参考例子才发现,几乎所有的仓位都是self.fixed_size=1,就没有讲如何动态决定仓位的例子!
于是VNPY的QQ群里问前辈、先知,无人回答,再在论坛里问老师,终于回答了:”不建议动态仓位,这么重要的事情必须交给手工完成!“,这个答复让我有点懵——做量化交易你让为手动决定仓位???

2 是不建议动态仓位,还是无法动态仓位?

动态仓位需要什么条件:

  • 开仓价格(Price)
  • 可用资金(Money)
  • 合约乘数(Size)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)
  • 开仓的量 (N)
  • 开仓资金(KM)

假设是按手续费率(而不是按手收费),那么开仓资金为:

KM = Price*N*Size*Margin (1+ FeeRate)

那么必须符合添加KM < Money,进而推出

 N < Money/(Price*Size*Margin (1+ FeeRate))。

当然你不可能满仓干,也许还要一个最大开仓资金比例R,例如:

N = int(Money*R/(Price*Size*Margin (1+ FeeRate)))。

当R=40%表示用你账户里40%的资金,可以动态开仓的手数。这样不就可以动态开仓了吗?
当然实际开仓时的仓位计算可能比这复杂多了,比如你可以考虑交易合约的波动水平,需要考虑投资者愿意承担的风险水平等等,但不管怎么变化,策略动态开仓都必须要有如下这几个参数:

  • 开仓价格(Price)
  • 合约乘数(Size)
  • 可用资金(Money)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)

3 用户策略怎么可以与”钱“无关?

经过艰苦和漫长的代码研读和梳理,发现CTA策略交易中只有pos、trading和inited这些策略成员,没有与资金相关的东西。我们来看看这几个动态下单必须具备的参数是否提供了:

  • 开仓价格(Price):策略从合约的行情数据中提取
  • 合约乘数(Size):main_engine中可以使用vt_symbol作为参数提取,函数是main_engine.get_contract()
  • 可用资金(Money):目前有,可以通过 main_engine.get_account()函数提供 但是是当你运行多个策略的时候,这些策略是共用同一个账户的,无法直接使用。
  • 保证金(率)(Margin):目前有,但是这是各个交易所公布的统一的保证金(率),不是你开户的期货各种给你的保证金(率),如rb2010.SHFE的保证金是率8%,可是也许你的开户的期货公司给你的保证金率为却为12%或者15%,怎么用?
  • 手续费(率)(FeeRate):目前没有,而且这个费率回是因人而异的,这样是看你怎么和你的期货经纪人怎么谈判的,所以也没有办法用!

结论:

目前VNPY的CTA策略因为缺少上述几个关键参数,无法实现动态仓位交易。是不能也,而非不可以!

4 为CTA账户引入策略账户!

作为交易赚钱的CTA策略,怎么可以不与这些资金相关的参数打交道?因人而异的保证金和手续费不应该成为不提供这些参数的理由!
当多个策略在同时运行的时候,你的实际账户权益的消长,到底是哪个策略赚的,哪个策略赔的都无法说清楚,运行3天后就已经是一本糊涂账了,这怎么可以!

虽然有上面的困难,但是办法总比困难多!可以参考文华财经8.3或者库安9.0的办法(熟悉文华财经客户端的人应该都知道),它们的方法是用模组账户的方法来为每个用户模组创建一个虚拟的模组账户,很好地解决用户算法对资金、保证金和手续费等参数的设定!

策略账户的功能:

  1. 分配交易的初始资金,还可以做出金入金等虚拟操作,目的就是控制策略的交易规模。
  2. 设置各个合约实际使用的保证金(率)
  3. 设置各个合约可以使用的手续费(率),包括开仓、平仓和平今仓手续费(率)
  4. 可以记录策略产出的委托单
  5. 可以记录策略初始的成交单
  6. 可以为策略提供合约的当前可用资金、保证金(率)和手续费(率)
  7. 算策略的历史交易盈亏和当前交易的浮动盈亏
  8. 提供当前策略账户的权益和可用资金
  9. 提供策略自创建以来的所有历史委托单和成交单查询,解决目前CTA策略之知道最新活动委托单,当日成交单和未平仓的仓位,而不知道历史交易的情况的问题。

如何实现策略账户 ?

策略账户的已经基本上实现了,目前只在测试中,且看我一步一步慢慢为大家分享......

5. 最新进展

有兴趣的可以先看看 策略账户界面展示

K线的种类可以这样划分:

按K线周期长度:

  • 1 常规周期K线
  • 2 自定义K周期线

按K线参考起始时间划分:

  • 1 等自然时长K线
  • 2 等交易时长K线
    • 1)参考交易日起点的等交易时长K线
    • 2)参考日盘起点和夜盘起点的等交易时长K线
    • 3)参考合约上市日起点的等交易时长K线

一、按照K线周期划分

  • 1 常规周期K线:如1,5,10,15,20,30分钟K线,1,2,4小时K线,日K线,周K线、月K线,年K线等。它们通常在常见的金融终端的GUI界面都会出现。因为是常规周期K线,为了加快软件的加载、显示和速度,常常把它们直接在服务器的数据库中做存储。这样做的好处是K线数据的处理和加载简单,显示速度快,缺点是需要为每个用户可能用到的周都准备一个表。
  • 2 自定义周期K线:在常规周期之外,用户自定义时间周期的K线,如7,19分钟周期K线。

VNPY对常规周期K线和自定义周期K线的处理是一视同仁的,统一使用BarGenerator()就可以了,它只使用1分钟K线数据,根据需要产生N分钟K线,至于产生出来是常规周期K线,还是自定义周期K线,主要看N是多少了。这样做的好处是自由!缺点是处理复杂些、速度慢些(不过反正不用手工处理,让计算机做,无所谓)。

二、按K线起始时间的参考起点划分

1 等自然时长K线

它是按照自然时间来计算一根K线是否结束的。如对于30分钟K线,rb2010合约在每个交易日都会产生这样的一组K线:
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:30——15分钟(?)
10:30-11:00——30分钟
11:00-11:30——30分钟
13:30-14:00——30分钟
14:00-14:30——30分钟
14:30-15:00——30分钟
目前vnpy的BarGenerator()就是这种K线参考机制。优点点是实现简单,无需考虑交易时段;缺点是明天都会有特别30分钟K是个另类:10:00-10:30的K线,其实只交易了15分钟!

2 等交易时长K线

熟悉文华财经8.3的应该熟悉下图的设置:
description

1)参考交易日起点

以合约的日交易时间的起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制”就是这种情况。
如RB2010交易时间段:'21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的30分钟K线分别是
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:45——30分钟
10:45-11:15——30分钟
11:15-11:45——30分钟
13:45-14:15——30分钟
14:15-14:45——30分钟
14:45-15:00——15分钟(?)

这种K线时间参考机制的优点是不会模糊每日开市时的跳空行情,缺点是每日都可能产生一根交易时间不足30分钟的K线。

2)参考日盘起点和夜盘起点

分别以合约的日盘交易时间起点和夜盘交易时间起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制改进型”就是这种情况。
如RB2010交易时间段:夜盘:'21:00-02:30, 日盘:09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的60分钟K线分别是
夜盘K线:
起止时间 交易时长
21:00-22:00——60分钟
22:00-23:00——60分钟
23:00-00:00——60分钟
00:00-01:00——60分钟
01:00-02:00——60分钟
02:00-02:30——30分钟(?)
日盘K线:
起止时间 交易时长
09:00-10:00——60分钟
10:00-11:15——60分钟
11:15-11:15——60分钟
11:15-14:15——60分钟
14:15-15:00——45分钟(?)
这种K线时间参考机制的优点是不会模糊各种日盘和夜盘开市时的跳空行情,缺点是每日都可能产生交易时长不足60分钟的K线。

3)参考合约上市日起点

以合约上市日的交易时间起点为参考,取出节假日,考虑交易时段,然后以等交易时间宽度产生K线。如FixedBarGenerator对N分钟K线的处理就是这种情况。FixedBarGenerator的原理参见具体实现代码参见在前面的帖子里面已经描述得比较清楚了,这里不再重复。

参考合约上市日起点、等交易时间的好处是:所有的K线的交易时长都是想等的,它们的成交量是可类比的。因为交易时长不等的K线产生的所谓“地量”或者“天量”,有多大意义是可想而知的。这种K线产生机制的缺点是:可能产生跨交易日,跨日盘和夜盘的K线,这样可能造成观察不到交易日开盘跳空,日盘和夜盘开市时的跳空现象,因为可能那根K线还没有结束。

对声称“三行代码搞定国内期货10:15-10:30的K线“的看法:

总有人声称“三行代码搞定国内期货10:15-10:30的K线“,并且上了精华版。三行代码可以搞定?那只是按下葫芦起了瓢的解决方法。
同样是国内期货,IF就没有这样的休市时间段,难倒你都要有这么做?自定义周期7分钟,19分钟K线你有怎么办?还有 再说了,就是改也最好是对BarGenerator扩展定制一个新的K线生成器,二不能直接修改BarGenerator的代码,修改它会导致在不同的合约、某些周期长度上出错,而你却浑然不知!
举例下面这些合约,都是没有10:15-10:30休市时间段的

合约代码:IC2008 名称:中证500指数2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC888 名称:中证主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC99 名称:中证指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2009 名称:中证500指数2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2103 名称:中证500指数2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88A3 名称:中证次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88 名称:中证主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC889 名称:中证主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC2012 名称:中证500指数2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IC88A2 名称:中证次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF888 名称:沪深主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2009 名称:IF2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2103 名称:IF2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2012 名称:IF2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF889 名称:沪深主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF2008 名称:IF2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88A2 名称:沪深次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88 名称:沪深主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF88A3 名称:沪深次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IF99 名称:沪深指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2103 名称:上证50指数2103 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88 名称:上证主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH888 名称:上证主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2012 名称:上证50指数2012 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2009 名称:上证50指数2009 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH2008 名称:上证50指数2008 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88A3 名称:上证次次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH88A2 名称:上证次主力连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH889 名称:上证主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:00
合约代码:IH99 名称:上证指数连续 交易时间段:09:31-11:30,13:01-15:00
合约代码:T2009 名称:10年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:T888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:T889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:T99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:T88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:T2012 名称:10年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:T2103 名称:10年期国债2103 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2009 名称:5年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2012 名称:5年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF888 名称:年期国债主力连续价差平滑 交易时间段:09:31-11:30,13:01-15:15
合约代码:TF2103 名称:5年期国债2103 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2012 名称:2年期国债2012 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS889 名称:年期国债主力连续价差平滑(后复权) 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS99 名称:年期国债指数连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS88 名称:年期国债主力连续 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2009 名称:2年期国债2009 交易时间段:09:31-11:30,13:01-15:15
合约代码:TS2103 名称:2年期国债2103 交易时间段:09:31-11:30,13:01-15:15

总结:

鱼和熊掌不可兼得,每种K线产生机制都有优点和缺点,至于您选择哪一种来产生你的K线,就看你的取舍了。明白与不明白个中的道理却是大相径庭的,所谓胸有成竹,才能够临危不乱。谨以本贴分享本人的一点浅薄之见,希望能够帮助到您!

CTA策略的交易活动包含哪些?

CTA策略的交易活动可以使用下面这些来描述:

  • 策略被接受的停止单(如果使用了停止单);
  • 策略被接受的委托单;
  • 策略收到的成交单;
  • 策略的合约持仓;
  • 策略的权益统计。

CTA策略的交易活动的呈现形式

  • 委托单以粉色的实心上箭头(买开)和空心下箭头(卖平)标识,蓝色的实心下箭头(卖开)和空心上箭头(买平)来标识,标注在发生K线的接受时刻。因为停止单实际上是转化为合约当时的涨停价或跌停价的委托单,所以停止单也是以限价委托单的形式来标注的;
  • 成交单以红色的实心上箭头(买开)和空心下箭头(卖平)标识,绿色的实心下箭头(卖开)和空心上箭头(买平)来标识,标注在发生K线的接受时刻;
  • 策略持仓以标题变量的方式显示在主题的标题栏中,随光标所在K线的变化而变化。包含:多头累积持仓,空头持仓,以及在当根K线时浮动盈亏,冻结仓位等信息;
  • 策略的权益统计也是以标题变量的方式显示在主题的标题栏中,随光标所在K线的变化而变化。包含策略自创建以来的权益变化信息。
  • 所有在主图标题栏中的显示信息,可以在策略中配置的,以便只显示你感兴趣的标题项。

几个概念:

  • 一次完整交易:策略的持仓从零开始到非零,再从非零仓位变成零仓位的过程。
  • 策略权益:初始分配资金+各次已完成交易的盈亏+未完成交易的浮动盈亏。
  • 策略账户:每个策略实例对应一个策略账户。策略账户中包含为策略分配的初始资金、历史委托单、历史成交单、历史持仓。利用这些信息,策略账户可以提供策略账户的当前权益,策略账户可用资金。

需要完成的工作

  • 实现一个策略交易监视器。它是一个rpc_server,具有按策略实例名称的方式对策略的委托单、成交单、持仓进行记录、查询、持仓统计的功能。可以在本地运行,也可以单独运行在远程。
  • 实现一个包含rpc_client的cta_template的扩展模版。它可以为其派生的用户策略提供下面的功能:
    1)策略初始化时创建K线图表窗口
    2)可以为用户策略提供K线主图指标、附图指标
    3)策略运行时,把on_order()、on_trade()收到的OrderData,TradeData等数据保存到策略交易监视器;
    4)把on_order()、on_trade()收到的OrderData,TradeData等数据推送到K线图表
  • 实现一个包含从ChartWidget派生类的K线图表
    1)它的主图和附图是用户策略配置的
    2)配置的主图包含一个K线主图,一个可以显示历史交易的交易主图

登录时就报,来自qt.network.ssl,QSslSocket,如何解决?

description

VNPY系统CTA策略中的BarGenerater的问题:

当K线周期为1,5,15分钟或者1日,1周时,它是没有问题的
当K线周期为10,20,30分钟或者一些自定义分钟周期如7分钟,还有1、2,4小时,由于合约的交易时段的不规则,导致某些K线的周期于其时间发生的交易时间不想等。
你开启CTA策略的时机是随机的,这导致self.laod_bar(20)的执行也是随机的,它从米筐或者数据库加载的1分钟数据也是随机,最终导致你所产生的上述K线也是随机的。那你已经这样的K线数据计算出来的指标在某种程度上可能也随机的。

问题出在合约的交易时间上

不赘述原因了,举例吧:
RB2010交易时间段:'21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
假如你的K线每天从21:01开始计算K线,
如果K线的周期为10分钟,那么在10:10-10:20的那个K线其实只交易了5分钟
如果K线的周期为20分钟,那么在10:00-10:20的那个K线其实只交易了15分钟,在10:20-10:40的那个K线其实只交易了10分钟
如果K线的周期为30分钟,那么在10:00-10:30的那个K线其实只交易了15分钟
如果K线的周期为60分钟,那么在10:00-11:00的那个K线其实只交易了45分钟
如果K线的周期为120分钟,那么在10:00-14:00的那个K线其实只交易了105分钟

K线应该是从初始化策略之时往前计算,还是从上市日期开始计算?

如果BarGenerator采用等交易时长产生K线,策略初始化时通过load_bar(n),读取1分钟历史K线,目前BarGenerator时是从n日之前的第一个1分钟K线区合约其他周期的K线的。
这导某些周期K线随n值不同,K线的起止时间会变化。而如果采用从上市日期开始计算等交易时长的K线位置,则无论何时初始化策略,K线的起止时间都是一样的。

你希望那种K线:

1 等自然时长K线——无需考虑交易时段
2 等交易时长K线——需要考虑交易时段
3 从上市日起算K线——起止位置固定
4 从策略初始化时起算K线——起止位置不固定

第6帖子已经实现目标——固定交易时长位置固定的K线图

description

首先说明:这不只是DemoStrategy的问题,其他策略也一样存在

我只是拿DemoStrategy的例子来说明问题。

通常一个策略需要在on_init()函数中调用self.load_bar()

例如:

陈老师在CTA策略实战进阶中的课程里讲的DemoStrategy策略:

class DemoStrategy(CtaTemplate):
    """ 一个演示策略 """
    author = "hxx"

    fast_window = 10
    slow_window = 20

    fast_ma0 = 0
    fast_ma1 = 0
    slow_ma0 = 0
    slow_ma1 = 0

    parameters = [
        "fast_window",
        "slow_window"
    ]

    variables = [
        "fast_ma0",
        "fast_ma1",
        "slow_ma0",
        "slow_ma1",
    ]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict 
    ):
        """构造函数"""
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)

        self.bg = NewBarGenerator(
            on_bar=self.on_bar,
            window=7,
            on_window_bar=on_7min_bar,
            interval=Interval.Minute)

        self.am = NewArrayManager()


    def on_init(self):
        """"""
        self.write_log("策略初始化")

        self.load_bar(10)       # 加载10日的7分钟K线

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """ 策略停止 """
        self.write_log(" 策略停止 ")

    def on_tick(self,tick:TickData):
        """ Tick更新 """
        self.bg.update_tick(tick) 

    def on_bar(self, bar: BarData):
        """K线更新"""
        self.bg.update_bar(bar)

    def on_7min_bar(self, bar: BarData):
        """K线更新"""
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]

        """ 定义金叉和死叉 """

        cross_over = (self.fast_ma0>= self.fast_ma1 and
                      self.slow_ma0<self.slow_ma1)  

        cross_below = (self.slow_ma0>self.slow_ma1 and 
                      self.slow_ma0<=self.slow_ma1)

        if cross_over:
            price = bar.close_price + 5

            if not self.pos:
                self.buy(price,1)
            elif self.pos < 0:
                self.cover(price,1)
                self.buy(price,1)
        elif cross_below:
            price = bar.close_price - 5

            if not self.pos:
                self.short(price,1)
            elif self.pos>0:
                self.sell(price,1)
                self.short(price,1)

        # 更新图形界面 
        self.put_event()

self.load_bar()是CtaTemplate策略模版的方法:

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ):
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback = self.on_bar

        self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

CtaTemplate.load_bar()又调用vnpy.app.cta_strategy.engine里的CtaEngine的load_bar()

CtaEngine的load_bar()的代码是这样的:

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

        for bar in bars:
            callback(bar)

可能的问题:

CtaEngine.load_bar()对历史数据的处理逻辑是:
1 首先从rqdatac获取历史分钟数据,利用DemoStrategyd策略的on_bar()合成出7分钟K线,然后调用on_7min_bar()。这样self.am是一个默认缓冲为100个FIFO数组队列,利用它执行策略的买卖信号计算和下单。它的end是当前时间,start是10天前的时间。策略初始化后,从行情接口可以不断地收到tick数据,然后执行DemoStrategyd策略on_tick()-->on_bar()-->on_7min_bar()
2 如果rqdatac没有读取到历史分钟数据,那么就从本地数据库中读取。问题来了:

1) 如果本地数据库中有start-end之间的数据,可是最后的数据时间与当前时间有空档,例如最后保存的有30分钟空档,而初始化后行情接口虽然也是按照on_tick()-->on_bar()-->on_7min_bar()的过程不断地更新self.am,可是我们知道它所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
2) 如果本地数据库中有start-end之间的数据,可是这些数据本身内部可能好几段历史返分钟数据,本身中间就有空档,导致self.am所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
3) 本地数据可能是从rqdatac读取保存的,有谁的动作那么快,这边保存了历史数据,那边立刻启动策略,立刻初始化?所以当rqdatac没有历史数据,转而从本地数据库读取,这本身就会造成历史K线与新合成处理的数据空档。

建议:在实盘环境下,CtaEngine.load_bar()不要保留从本地数据库读取历史数据

理由如下:

1 实盘策略不要从数据库读取数据的选择,因为容易造成无法补救的K线数据空档;
2 它发生在rqdatac无法读取的情况下,自然无法再从rqdata补齐数据;
3 而行情接口通常不提供历史分钟K线数据功能;
4 可以没有数据,没有当然不会计算买卖点,也就不会交易。
5 如果简单使用有空档K线数据,发出了错误的买卖信号,进行了错误的交易,这是不应该的!

比如:本地数据库最后一条数据是2天前的1分钟K线数据,而当前的rqdatac是不通的,可是行情接口是没有问题的,那么合成出来的新K线与历史K线之间会有多大的跳空,这个是无法想象的,依据这样的有空档的新、旧K数据来计算、交易,谁知道会出什么问题?

上一主题失败了

上一主题,“为K线图表添砖加瓦——让CTA策略的运行看得见”,可以说是失败了,原因已经找到,是因为掉到类变量和实例变量的坑里了。具体过程参加这个主题:
https://www.vnpy.com/forum/topic/3860-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian

问题已经解决了,并且做了改进

1 解决了不同合约同时运行_kx_strategy策略时,K线图会互相影响的问题;
2 去掉策略管理器中的“K线图表”按钮,保持与原来的界面一致,在_kx_strategy策略中增加一个show_chart参数项目,如果想显示K线图,为它配置为True,否则不会显示K线图;
3 增加策略被移除时,删除该策略的K线图表功能
4 K线图表中的显示内容在_kx_strategy策略中配置,而不是一个固定的主图和附图搭配。参照我的init_kx_chart()方法,您也可以为自己的策略配置自己的K线主图和附图指标;
5 添加最后一根了临时K线的显示

改进后的实现方法

先备份文件

vnpy\app\cta_strategy\base.py
vnpy\app\cta_strategy\engine.py
vnpy\app\cta_strategy\ui\widget.py
vnpy\app\cta_backtester\engine.py

修改vnpy\app\cta_strategy\base.py

"""
Defines constants and objects used in CtaStrategy App.
"""

from dataclasses import dataclass, field
from enum import Enum
from datetime import timedelta

from vnpy.trader.constant import Direction, Offset, Interval

APP_NAME = "CtaStrategy"
STOPORDER_PREFIX = "STOP"


class StopOrderStatus(Enum):
    WAITING = "等待中"
    CANCELLED = "已撤销"
    TRIGGERED = "已触发"


class EngineType(Enum):
    LIVE = "实盘"
    BACKTESTING = "回测"


class BacktestingMode(Enum):
    BAR = 1
    TICK = 2


@dataclass
class StopOrder:
    vt_symbol: str
    direction: Direction
    offset: Offset
    price: float
    volume: float
    stop_orderid: str
    strategy_name: str
    lock: bool = False
    vt_orderids: list = field(default_factory=list)
    status: StopOrderStatus = StopOrderStatus.WAITING


EVENT_CTA_LOG = "eCtaLog"
EVENT_CTA_STRATEGY = "eCtaStrategy"
EVENT_CTA_STOPORDER = "eCtaStopOrder"

EVENT_CTA_TICK = "eCtaTick"                     # hxxjava add
EVENT_CTA_HISTORY_BAR = "eCtaHistoryBar"        # hxxjava add
EVENT_CTA_BAR = "eCtaBar"                       # hxxjava add
EVENT_CTA_ORDER = "eCtaOrder"                   # hxxjava add  
EVENT_CTA_TRADE = "eCtaTrade"                   # hxxjava add


INTERVAL_DELTA_MAP = {
    Interval.MINUTE: timedelta(minutes=1),
    Interval.HOUR: timedelta(hours=1),
    Interval.DAILY: timedelta(days=1),
}

修改vnpy\app\cta_strategy\engine.py

""""""

import importlib
import os
import traceback
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from tzlocal import get_localzone

from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
    OrderRequest,
    SubscribeRequest,
    HistoryRequest,
    LogData,
    TickData,
    BarData,
    ContractData
)
from vnpy.trader.event import (
    EVENT_TICK,
    EVENT_ORDER,
    EVENT_TRADE,
    EVENT_POSITION
)
from vnpy.trader.constant import (
    Direction,
    OrderType,
    Interval,
    Exchange,
    Offset,
    Status
)
from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to
from vnpy.trader.database import database_manager
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.converter import OffsetConverter

from .base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_STRATEGY,
    EVENT_CTA_STOPORDER,
    EngineType,
    StopOrder,
    StopOrderStatus,
    STOPORDER_PREFIX
)
from .template import CtaTemplate


STOP_STATUS_MAP = {
    Status.SUBMITTING: StopOrderStatus.WAITING,
    Status.NOTTRADED: StopOrderStatus.WAITING,
    Status.PARTTRADED: StopOrderStatus.TRIGGERED,
    Status.ALLTRADED: StopOrderStatus.TRIGGERED,
    Status.CANCELLED: StopOrderStatus.CANCELLED,
    Status.REJECTED: StopOrderStatus.CANCELLED
}


class CtaEngine(BaseEngine):
    """"""

    engine_type = EngineType.LIVE  # live trading engine

    setting_filename = "cta_strategy_setting.json"
    data_filename = "cta_strategy_data.json"

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(CtaEngine, self).__init__(
            main_engine, event_engine, APP_NAME)

        self.strategy_setting = {}  # strategy_name: dict
        self.strategy_data = {}     # strategy_name: dict

        self.classes = {}           # class_name: stategy_class
        self.strategies = {}        # strategy_name: strategy

        self.symbol_strategy_map = defaultdict(
            list)                   # vt_symbol: strategy list
        self.orderid_strategy_map = {}  # vt_orderid: strategy
        self.strategy_orderid_map = defaultdict(
            set)                    # strategy_name: orderid list

        self.stop_order_count = 0   # for generating stop_orderid
        self.stop_orders = {}       # stop_orderid: stop_order

        self.init_executor = ThreadPoolExecutor(max_workers=1)

        self.rq_client = None
        self.rq_symbols = set()

        self.vt_tradeids = set()    # for filtering duplicate trade

        self.offset_converter = OffsetConverter(self.main_engine)

    def init_engine(self):
        """
        """
        self.init_rqdata()
        self.load_strategy_class()
        self.load_strategy_setting()
        self.load_strategy_data()
        self.register_event()
        self.write_log("CTA策略引擎初始化成功")

    def close(self):
        """"""
        self.stop_all_strategies()

    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)

    def init_rqdata(self):
        """
        Init RQData client.
        """
        result = rqdata_client.init()
        if result:
            self.write_log("RQData数据接口初始化成功")

    def query_bar_from_rq(
        self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
    ):
        """
        Query bar data from RQData.
        """
        req = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            interval=interval,
            start=start,
            end=end
        )
        data = rqdata_client.query_history(req)
        return data

    def process_tick_event(self, event: Event):
        """"""
        tick = event.data

        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        self.check_stop_order(tick)

        for strategy in strategies:
            if strategy.inited:
                self.call_strategy_func(strategy, strategy.on_tick, tick)

    def process_order_event(self, event: Event):
        """"""
        order = event.data

        self.offset_converter.update_order(order)

        strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
        if not strategy:
            return

        # Remove vt_orderid if order is no longer active.
        vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
        if order.vt_orderid in vt_orderids and not order.is_active():
            vt_orderids.remove(order.vt_orderid)

        # For server stop order, call strategy on_stop_order function
        if order.type == OrderType.STOP:
            so = StopOrder(
                vt_symbol=order.vt_symbol,
                direction=order.direction,
                offset=order.offset,
                price=order.price,
                volume=order.volume,
                stop_orderid=order.vt_orderid,
                strategy_name=strategy.strategy_name,
                status=STOP_STATUS_MAP[order.status],
                vt_orderids=[order.vt_orderid],
            )
            self.call_strategy_func(strategy, strategy.on_stop_order, so)

        # Call strategy on_order function
        self.call_strategy_func(strategy, strategy.on_order, order)

    def process_trade_event(self, event: Event):
        """"""
        trade = event.data

        # Filter duplicate trade push
        if trade.vt_tradeid in self.vt_tradeids:
            return
        self.vt_tradeids.add(trade.vt_tradeid)

        self.offset_converter.update_trade(trade)

        strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
        if not strategy:
            return

        # Update strategy pos before calling on_trade method
        if trade.direction == Direction.LONG:
            strategy.pos += trade.volume
        else:
            strategy.pos -= trade.volume

        self.call_strategy_func(strategy, strategy.on_trade, trade)

        # Sync strategy variables to data file
        self.sync_strategy_data(strategy)

        # Update GUI
        self.put_strategy_event(strategy)

    def process_position_event(self, event: Event):
        """"""
        position = event.data

        self.offset_converter.update_position(position)

    def check_stop_order(self, tick: TickData):
        """"""
        for stop_order in list(self.stop_orders.values()):
            if stop_order.vt_symbol != tick.vt_symbol:
                continue

            long_triggered = (
                stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price
            )
            short_triggered = (
                stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price
            )

            if long_triggered or short_triggered:
                strategy = self.strategies[stop_order.strategy_name]

                # To get excuted immediately after stop order is
                # triggered, use limit price if available, otherwise
                # use ask_price_5 or bid_price_5
                if stop_order.direction == Direction.LONG:
                    if tick.limit_up:
                        price = tick.limit_up
                    else:
                        price = tick.ask_price_5
                else:
                    if tick.limit_down:
                        price = tick.limit_down
                    else:
                        price = tick.bid_price_5

                contract = self.main_engine.get_contract(stop_order.vt_symbol)

                vt_orderids = self.send_limit_order(
                    strategy,
                    contract,
                    stop_order.direction,
                    stop_order.offset,
                    price,
                    stop_order.volume,
                    stop_order.lock
                )

                # Update stop order status if placed successfully
                if vt_orderids:
                    # Remove from relation map.
                    self.stop_orders.pop(stop_order.stop_orderid)

                    strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
                    if stop_order.stop_orderid in strategy_vt_orderids:
                        strategy_vt_orderids.remove(stop_order.stop_orderid)

                    # Change stop order status to cancelled and update to strategy.
                    stop_order.status = StopOrderStatus.TRIGGERED
                    stop_order.vt_orderids = vt_orderids

                    self.call_strategy_func(
                        strategy, strategy.on_stop_order, stop_order
                    )
                    self.put_stop_order_event(stop_order)

    def send_server_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        type: OrderType,
        lock: bool
    ):
        """
        Send a new order to server.
        """
        # Create request and send order.
        original_req = OrderRequest(
            symbol=contract.symbol,
            exchange=contract.exchange,
            direction=direction,
            offset=offset,
            type=type,
            price=price,
            volume=volume,
        )

        # Convert with offset converter
        req_list = self.offset_converter.convert_order_request(original_req, lock)

        # Send Orders
        vt_orderids = []

        for req in req_list:
            req.reference = strategy.strategy_name      # Add strategy name as order reference

            vt_orderid = self.main_engine.send_order(
                req, contract.gateway_name)

            # Check if sending order successful
            if not vt_orderid:
                continue

            vt_orderids.append(vt_orderid)

            self.offset_converter.update_order_request(req, vt_orderid)

            # Save relationship between orderid and strategy.
            self.orderid_strategy_map[vt_orderid] = strategy
            self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)

        return vt_orderids

    def send_limit_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        lock: bool
    ):
        """
        Send a limit order to server.
        """
        return self.send_server_order(
            strategy,
            contract,
            direction,
            offset,
            price,
            volume,
            OrderType.LIMIT,
            lock
        )

    def send_server_stop_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        lock: bool
    ):
        """
        Send a stop order to server.

        Should only be used if stop order supported
        on the trading server.
        """
        return self.send_server_order(
            strategy,
            contract,
            direction,
            offset,
            price,
            volume,
            OrderType.STOP,
            lock
        )

    def send_local_stop_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        lock: bool
    ):
        """
        Create a new local stop order.
        """
        self.stop_order_count += 1
        stop_orderid = f"{STOPORDER_PREFIX}.{self.stop_order_count}"

        stop_order = StopOrder(
            vt_symbol=strategy.vt_symbol,
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            stop_orderid=stop_orderid,
            strategy_name=strategy.strategy_name,
            lock=lock
        )

        self.stop_orders[stop_orderid] = stop_order

        vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
        vt_orderids.add(stop_orderid)

        self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
        self.put_stop_order_event(stop_order)

        return [stop_orderid]

    def cancel_server_order(self, strategy: CtaTemplate, vt_orderid: str):
        """
        Cancel existing order by vt_orderid.
        """
        order = self.main_engine.get_order(vt_orderid)
        if not order:
            self.write_log(f"撤单失败,找不到委托{vt_orderid}", strategy)
            return

        req = order.create_cancel_request()
        self.main_engine.cancel_order(req, order.gateway_name)

    def cancel_local_stop_order(self, strategy: CtaTemplate, stop_orderid: str):
        """
        Cancel a local stop order.
        """
        stop_order = self.stop_orders.get(stop_orderid, None)
        if not stop_order:
            return
        strategy = self.strategies[stop_order.strategy_name]

        # Remove from relation map.
        self.stop_orders.pop(stop_orderid)

        vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
        if stop_orderid in vt_orderids:
            vt_orderids.remove(stop_orderid)

        # Change stop order status to cancelled and update to strategy.
        stop_order.status = StopOrderStatus.CANCELLED

        self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
        self.put_stop_order_event(stop_order)

    def send_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool,
        lock: bool
    ):
        """
        """
        contract = self.main_engine.get_contract(strategy.vt_symbol)
        if not contract:
            self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
            return ""

        # Round order price and volume to nearest incremental value
        price = round_to(price, contract.pricetick)
        volume = round_to(volume, contract.min_volume)

        if stop:
            if contract.stop_supported:
                return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
            else:
                return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
        else:
            return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)

    def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
        """
        """
        if vt_orderid.startswith(STOPORDER_PREFIX):
            self.cancel_local_stop_order(strategy, vt_orderid)
        else:
            self.cancel_server_order(strategy, vt_orderid)

    def cancel_all(self, strategy: CtaTemplate):
        """
        Cancel all active orders of a strategy.
        """
        vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
        if not vt_orderids:
            return

        for vt_orderid in copy(vt_orderids):
            self.cancel_order(strategy, vt_orderid)

    def get_engine_type(self):
        """"""
        return self.engine_type

    def get_pricetick(self, strategy: CtaTemplate):
        """
        Return contract pricetick data.
        """
        contract = self.main_engine.get_contract(strategy.vt_symbol)

        if contract:
            return contract.pricetick
        else:
            return None

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

        for bar in bars:
            callback(bar)

    def load_tick(
        self,
        vt_symbol: str,
        days: int,
        callback: Callable[[TickData], None]
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now()
        start = end - timedelta(days)

        ticks = database_manager.load_tick_data(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=end,
        )

        for tick in ticks:
            callback(tick)

    def call_strategy_func(
        self, strategy: CtaTemplate, func: Callable, params: Any = None
    ):
        """
        Call function of a strategy and catch any exception raised.
        """
        try:
            if params:
                func(params)
            else:
                func()
        except Exception:
            strategy.trading = False
            strategy.inited = False

            msg = f"触发异常已停止\n{traceback.format_exc()}"
            self.write_log(msg, strategy)

    def add_strategy(
        self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
    ):
        """
        Add a new strategy.
        """
        if strategy_name in self.strategies:
            self.write_log(f"创建策略失败,存在重名{strategy_name}")
            return

        strategy_class = self.classes.get(class_name, None)
        if not strategy_class:
            self.write_log(f"创建策略失败,找不到策略类{class_name}")
            return

        strategy = strategy_class(self, strategy_name, vt_symbol, setting)
        self.strategies[strategy_name] = strategy

        # Add vt_symbol to strategy map.
        strategies = self.symbol_strategy_map[vt_symbol]
        strategies.append(strategy)

        # Update to setting file.
        self.update_strategy_setting(strategy_name, setting)

        self.put_strategy_event(strategy)

    def init_strategy(self, strategy_name: str):
        """
        Init a strategy.
        """
        self.init_executor.submit(self._init_strategy, strategy_name)

    def _init_strategy(self, strategy_name: str):
        """
        Init strategies in queue.
        """
        strategy = self.strategies[strategy_name]

        if strategy.inited:
            self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
            return

        self.write_log(f"{strategy_name}开始执行初始化")

        # Call on_init function of strategy
        self.call_strategy_func(strategy, strategy.on_init)

        # Restore strategy data(variables)
        data = self.strategy_data.get(strategy_name, None)
        if data:
            for name in strategy.variables:
                value = data.get(name, None)
                if value:
                    setattr(strategy, name, value)

        # Subscribe market data
        contract = self.main_engine.get_contract(strategy.vt_symbol)
        if contract:
            req = SubscribeRequest(
                symbol=contract.symbol, exchange=contract.exchange)
            self.main_engine.subscribe(req, contract.gateway_name)
        else:
            self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)

        # Put event to update init completed status.
        strategy.inited = True
        self.put_strategy_event(strategy)
        self.write_log(f"{strategy_name}初始化完成")

    def start_strategy(self, strategy_name: str):
        """
        Start a strategy.
        """
        strategy = self.strategies[strategy_name]
        if not strategy.inited:
            self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
            return

        if strategy.trading:
            self.write_log(f"{strategy_name}已经启动,请勿重复操作")
            return

        self.call_strategy_func(strategy, strategy.on_start)
        strategy.trading = True

        self.put_strategy_event(strategy)

    def stop_strategy(self, strategy_name: str):
        """
        Stop a strategy.
        """
        strategy = self.strategies[strategy_name]
        if not strategy.trading:
            return

        # Call on_stop function of the strategy
        self.call_strategy_func(strategy, strategy.on_stop)

        # Change trading status of strategy to False
        strategy.trading = False

        # Cancel all orders of the strategy
        self.cancel_all(strategy)

        # Sync strategy variables to data file
        self.sync_strategy_data(strategy)

        # Update GUI
        self.put_strategy_event(strategy)

    def edit_strategy(self, strategy_name: str, setting: dict):
        """
        Edit parameters of a strategy.
        """
        strategy = self.strategies[strategy_name]
        strategy.update_setting(setting)

        self.update_strategy_setting(strategy_name, setting)
        self.put_strategy_event(strategy)

    def remove_strategy(self, strategy_name: str):
        """
        Remove a strategy.
        """
        strategy = self.strategies[strategy_name]
        if strategy.trading:
            self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
            return

        # Remove setting
        self.remove_strategy_setting(strategy_name)

        # Remove from symbol strategy map
        strategies = self.symbol_strategy_map[strategy.vt_symbol]
        strategies.remove(strategy)

        # Remove from active orderid map
        if strategy_name in self.strategy_orderid_map:
            vt_orderids = self.strategy_orderid_map.pop(strategy_name)

            # Remove vt_orderid strategy map
            for vt_orderid in vt_orderids:
                if vt_orderid in self.orderid_strategy_map:
                    self.orderid_strategy_map.pop(vt_orderid)

        # Remove from strategies
        self.strategies.pop(strategy_name)

        return True

    def load_strategy_class(self):
        """
        Load strategy class from source code.
        """
        path1 = Path(__file__).parent.joinpath("strategies")
        self.load_strategy_class_from_folder(
            path1, "vnpy.app.cta_strategy.strategies")

        path2 = Path.cwd().joinpath("strategies")
        self.load_strategy_class_from_folder(path2, "strategies")

    def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
        """
        Load strategy class from certain folder.
        """
        for dirpath, dirnames, filenames in os.walk(str(path)):
            for filename in filenames:
                if filename.split(".")[-1] in ("py", "pyd", "so"):
                    strategy_module_name = ".".join([module_name, filename.split(".")[0]])
                    self.load_strategy_class_from_module(strategy_module_name)

    def load_strategy_class_from_module(self, module_name: str):
        """
        Load strategy class from module file.
        """
        try:
            module = importlib.import_module(module_name)
            # print(f"{module_name}'s module:{module}")   # hxxjava add

            for name in dir(module):
                # print(f"name:{name}")   # hxxjava add
                value = getattr(module, name)
                if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
                    self.classes[value.__name__] = value
                    # print(f"value.__name__:{value.__name__}")   # hxxjava add
        except:  # noqa
            msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
            self.write_log(msg)

    def load_strategy_data(self):
        """
        Load strategy data from json file.
        """
        self.strategy_data = load_json(self.data_filename)

    def sync_strategy_data(self, strategy: CtaTemplate):
        """
        Sync strategy data into json file.
        """
        data = strategy.get_variables()
        data.pop("inited")      # Strategy status (inited, trading) should not be synced.
        data.pop("trading")

        self.strategy_data[strategy.strategy_name] = data
        save_json(self.data_filename, self.strategy_data)

    def get_all_strategy_class_names(self):
        """
        Return names of strategy classes loaded.
        """
        return list(self.classes.keys())

    def get_strategy_class_parameters(self, class_name: str):
        """
        Get default parameters of a strategy class.
        """
        strategy_class = self.classes[class_name]

        parameters = {}
        for name in strategy_class.parameters:
            parameters[name] = getattr(strategy_class, name)

        return parameters

    def get_strategy_parameters(self, strategy_name):
        """
        Get parameters of a strategy.
        """
        strategy = self.strategies[strategy_name]
        return strategy.get_parameters()

    def init_all_strategies(self):
        """
        """
        for strategy_name in self.strategies.keys():
            self.init_strategy(strategy_name)

    def start_all_strategies(self):
        """
        """
        for strategy_name in self.strategies.keys():
            self.start_strategy(strategy_name)

    def stop_all_strategies(self):
        """
        """
        for strategy_name in self.strategies.keys():
            self.stop_strategy(strategy_name)

    def load_strategy_setting(self):
        """
        Load setting file.
        """
        self.strategy_setting = load_json(self.setting_filename)

        for strategy_name, strategy_config in self.strategy_setting.items():
            self.add_strategy(
                strategy_config["class_name"],
                strategy_name,
                strategy_config["vt_symbol"],
                strategy_config["setting"]
            )

    def update_strategy_setting(self, strategy_name: str, setting: dict):
        """
        Update setting file.
        """
        strategy = self.strategies[strategy_name]

        self.strategy_setting[strategy_name] = {
            "class_name": strategy.__class__.__name__,
            "vt_symbol": strategy.vt_symbol,
            "setting": setting,
        }
        save_json(self.setting_filename, self.strategy_setting)

    def remove_strategy_setting(self, strategy_name: str):
        """
        Update setting file.
        """
        if strategy_name not in self.strategy_setting:
            return

        self.strategy_setting.pop(strategy_name)
        save_json(self.setting_filename, self.strategy_setting)

    def put_stop_order_event(self, stop_order: StopOrder):
        """
        Put an event to update stop order status.
        """
        event = Event(EVENT_CTA_STOPORDER, stop_order)
        self.event_engine.put(event)

    def put_strategy_event(self, strategy: CtaTemplate):
        """
        Put an event to update strategy status.
        """
        data = strategy.get_data()
        event = Event(EVENT_CTA_STRATEGY, data)
        self.event_engine.put(event)

    #--------------------------------------------------------------------------------------------------
    def get_position_detail(self, vt_symbol:str):
        """
        查询long_pos,short_pos(持仓),long_pnl,short_pnl(盈亏),active_order(未成交字典)
        收到PositionHolding类数据
        """
        try:
            return self.offset_converter.get_position_holding(vt_symbol)
        except:
            self.write_log(f"当前获取持仓信息为:{self.offset_converter.get_position_holding(vt_symbol)},等待获取持仓信息")
            position_detail = OrderedDict()     
            position_detail.active_orders = {} 
            position_detail.long_pos = 0
            position_detail.long_pnl = 0
            position_detail.long_yd = 0
            position_detail.long_td = 0
            position_detail.long_pos_frozen = 0
            position_detail.long_price = 0
            position_detail.short_pos = 0
            position_detail.short_pnl = 0          
            position_detail.short_yd = 0
            position_detail.short_td = 0
            position_detail.short_price = 0
            position_detail.short_pos_frozen = 0
            return position_detail

    def write_log(self, msg: str, strategy: CtaTemplate = None):
        """
        Create cta engine log event.
        """
        if strategy:
            msg = f"{strategy.strategy_name}: {msg}"

        log = LogData(msg=msg, gateway_name="CtaStrategy")
        event = Event(type=EVENT_CTA_LOG, data=log)
        self.event_engine.put(event)

    def send_email(self, msg: str, strategy: CtaTemplate = None):
        """
        Send email to default receiver.
        """
        if strategy:
            subject = f"{strategy.strategy_name}"
        else:
            subject = "CTA策略引擎"

        self.main_engine.send_email(subject, msg)

修改vnpy\app\cta_strategy\ui\widget.py

from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtGui, QtWidgets
from vnpy.trader.ui.widget import (
    BaseCell,
    EnumCell,
    MsgCell,
    TimeCell,
    BaseMonitor
)

from ..base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)

from ..engine import CtaEngine

from vnpy.usertools.kx_chart import NewChartWidget  # hxxjava add

class CtaManager(QtWidgets.QWidget):
    """"""

    signal_log = QtCore.pyqtSignal(Event)
    signal_strategy = QtCore.pyqtSignal(Event)

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        super(CtaManager, self).__init__()

        self.main_engine = main_engine
        self.event_engine = event_engine
        self.cta_engine = main_engine.get_engine(APP_NAME)

        self.managers = {}

        self.init_ui()
        self.register_event()
        self.cta_engine.init_engine()
        self.update_class_combo()

    def init_ui(self):
        """"""
        self.setWindowTitle("CTA策略")

        # Create widgets
        self.class_combo = QtWidgets.QComboBox()

        add_button = QtWidgets.QPushButton("添加策略")
        add_button.clicked.connect(self.add_strategy)

        init_button = QtWidgets.QPushButton("全部初始化")
        init_button.clicked.connect(self.cta_engine.init_all_strategies)

        start_button = QtWidgets.QPushButton("全部启动")
        start_button.clicked.connect(self.cta_engine.start_all_strategies)

        stop_button = QtWidgets.QPushButton("全部停止")
        stop_button.clicked.connect(self.cta_engine.stop_all_strategies)

        clear_button = QtWidgets.QPushButton("清空日志")
        clear_button.clicked.connect(self.clear_log)

        self.scroll_layout = QtWidgets.QVBoxLayout()
        self.scroll_layout.addStretch()

        scroll_widget = QtWidgets.QWidget()
        scroll_widget.setLayout(self.scroll_layout)

        scroll_area = QtWidgets.QScrollArea()
        scroll_area.setWidgetResizable(True)
        scroll_area.setWidget(scroll_widget)

        self.log_monitor = LogMonitor(self.main_engine, self.event_engine)

        self.stop_order_monitor = StopOrderMonitor(
            self.main_engine, self.event_engine
        )

        # Set layout
        hbox1 = QtWidgets.QHBoxLayout()
        hbox1.addWidget(self.class_combo)
        hbox1.addWidget(add_button)
        hbox1.addStretch()
        hbox1.addWidget(init_button)
        hbox1.addWidget(start_button)
        hbox1.addWidget(stop_button)
        hbox1.addWidget(clear_button)

        grid = QtWidgets.QGridLayout()
        grid.addWidget(scroll_area, 0, 0, 2, 1)
        grid.addWidget(self.stop_order_monitor, 0, 1)
        grid.addWidget(self.log_monitor, 1, 1)

        vbox = QtWidgets.QVBoxLayout()
        vbox.addLayout(hbox1)
        vbox.addLayout(grid)

        self.setLayout(vbox)

    def update_class_combo(self):
        """"""
        self.class_combo.addItems(
            self.cta_engine.get_all_strategy_class_names()
        )

    def register_event(self):
        """"""
        self.signal_strategy.connect(self.process_strategy_event)

        self.event_engine.register(
            EVENT_CTA_STRATEGY, self.signal_strategy.emit
        )

    def process_strategy_event(self, event):
        """
        Update strategy status onto its monitor.
        """
        data = event.data
        strategy_name = data["strategy_name"]

        if strategy_name in self.managers:
            manager = self.managers[strategy_name]
            manager.update_data(data)
        else:
            manager = StrategyManager(self, self.cta_engine, data)
            self.scroll_layout.insertWidget(0, manager)
            self.managers[strategy_name] = manager

    def remove_strategy(self, strategy_name):
        """"""
        manager = self.managers.pop(strategy_name)
        manager.deleteLater()

    def add_strategy(self):
        """"""
        class_name = str(self.class_combo.currentText())
        if not class_name:
            return

        parameters = self.cta_engine.get_strategy_class_parameters(class_name)
        editor = SettingEditor(parameters, class_name=class_name)
        n = editor.exec_()

        if n == editor.Accepted:
            setting = editor.get_setting()
            vt_symbol = setting.pop("vt_symbol")
            strategy_name = setting.pop("strategy_name")

            self.cta_engine.add_strategy(
                class_name, strategy_name, vt_symbol, setting
            )

    def clear_log(self):
        """"""
        self.log_monitor.setRowCount(0)

    def show(self):
        """"""
        self.showMaximized()



class StrategyManager(QtWidgets.QFrame):
    """
    Manager for a strategy
    """

    def __init__(
        self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict
    ):
        """"""
        super(StrategyManager, self).__init__()

        self.cta_manager = cta_manager
        self.cta_engine = cta_engine

        self.strategy_name = data["strategy_name"]
        self._data = data

        self.init_ui()

    def init_ui(self):
        """"""
        self.setFixedHeight(300)
        self.setFrameShape(self.Box)
        self.setLineWidth(1)

        self.init_button = QtWidgets.QPushButton("初始化")
        self.init_button.clicked.connect(self.init_strategy)

        self.start_button = QtWidgets.QPushButton("启动")
        self.start_button.clicked.connect(self.start_strategy)
        self.start_button.setEnabled(False)

        self.stop_button = QtWidgets.QPushButton("停止")
        self.stop_button.clicked.connect(self.stop_strategy)
        self.stop_button.setEnabled(False)

        self.edit_button = QtWidgets.QPushButton("编辑")
        self.edit_button.clicked.connect(self.edit_strategy)

        self.remove_button = QtWidgets.QPushButton("移除")
        self.remove_button.clicked.connect(self.remove_strategy)

        strategy_name = self._data["strategy_name"]
        vt_symbol = self._data["vt_symbol"]
        class_name = self._data["class_name"]
        author = self._data["author"]

        label_text = (
            f"{strategy_name}  -  {vt_symbol}  ({class_name} by {author})"
        )
        label = QtWidgets.QLabel(label_text)
        label.setAlignment(QtCore.Qt.AlignCenter)

        self.parameters_monitor = DataMonitor(self._data["parameters"])
        self.variables_monitor = DataMonitor(self._data["variables"])

        hbox = QtWidgets.QHBoxLayout()
        hbox.addWidget(self.init_button)
        hbox.addWidget(self.start_button)
        hbox.addWidget(self.stop_button)
        hbox.addWidget(self.edit_button)
        hbox.addWidget(self.remove_button)

        vbox = QtWidgets.QVBoxLayout()
        vbox.addWidget(label)
        vbox.addLayout(hbox)
        vbox.addWidget(self.parameters_monitor)
        vbox.addWidget(self.variables_monitor)
        self.setLayout(vbox)

    def update_data(self, data: dict):
        """"""
        self._data = data

        self.parameters_monitor.update_data(data["parameters"])
        self.variables_monitor.update_data(data["variables"])

        # Update button status
        variables = data["variables"]
        inited = variables["inited"]
        trading = variables["trading"]

        if not inited:
            return
        self.init_button.setEnabled(False)

        if trading:
            self.start_button.setEnabled(False)
            self.stop_button.setEnabled(True)
            self.edit_button.setEnabled(False)
            self.remove_button.setEnabled(False)
        else:
            self.start_button.setEnabled(True)
            self.stop_button.setEnabled(False)
            self.edit_button.setEnabled(True)
            self.remove_button.setEnabled(True)

    def init_strategy(self):
        """"""
        self.open_kx_chart()    # hxxjava add
        self.cta_engine.init_strategy(self.strategy_name)

    def start_strategy(self):
        """"""
        self.cta_engine.start_strategy(self.strategy_name)

    def stop_strategy(self):
        """"""
        self.cta_engine.stop_strategy(self.strategy_name)

    def edit_strategy(self):
        """"""
        strategy_name = self._data["strategy_name"]

        parameters = self.cta_engine.get_strategy_parameters(strategy_name)
        editor = SettingEditor(parameters, strategy_name=strategy_name)
        n = editor.exec_()

        if n == editor.Accepted:
            setting = editor.get_setting()
            self.cta_engine.edit_strategy(strategy_name, setting)

    def remove_strategy(self):
        """"""
        result = self.cta_engine.remove_strategy(self.strategy_name)

        # Only remove strategy gui manager if it has been removed from engine
        if result:
            self.cta_manager.remove_strategy(self.strategy_name)

            if self.kx_chart:           # hxxjava add
                self.kx_chart.close()
                self.kx_chart = None

    def open_kx_chart(self):    # hxxjava add
        strategy = self.cta_engine.strategies[self.strategy_name]
        setting = self.cta_engine.strategy_setting[self.strategy_name]['setting']
        show_chart = setting.get("show_chart",None)

        self.kx_chart = None
        if show_chart:
            event_engine = self.cta_engine.event_engine
            kx_interval = setting.get("kx_interval",None)
            self.kx_chart = NewChartWidget(event_engine = event_engine,strategy_name = self.strategy_name)
            self.kx_chart.setWindowTitle(f"K线图表:{self.strategy_name},周期:{kx_interval}")

            strategy.init_kx_chart(self.kx_chart)

            self.kx_chart.register_event()  # 注册消息
            self.kx_chart.show()    # 显示K线图

class DataMonitor(QtWidgets.QTableWidget):
    """
    Table monitor for parameters and variables.
    """

    def __init__(self, data: dict):
        """"""
        super(DataMonitor, self).__init__()

        self._data = data
        self.cells = {}

        self.init_ui()

    def init_ui(self):
        """"""
        labels = list(self._data.keys())
        self.setColumnCount(len(labels))
        self.setHorizontalHeaderLabels(labels)

        self.setRowCount(1)
        self.verticalHeader().setSectionResizeMode(
            QtWidgets.QHeaderView.Stretch
        )
        self.verticalHeader().setVisible(False)
        self.setEditTriggers(self.NoEditTriggers)

        for column, name in enumerate(self._data.keys()):
            value = self._data[name]

            cell = QtWidgets.QTableWidgetItem(str(value))
            cell.setTextAlignment(QtCore.Qt.AlignCenter)

            self.setItem(0, column, cell)
            self.cells[name] = cell

    def update_data(self, data: dict):
        """"""
        for name, value in data.items():
            cell = self.cells[name]
            cell.setText(str(value))


class StopOrderMonitor(BaseMonitor):
    """
    Monitor for local stop order.
    """

    event_type = EVENT_CTA_STOPORDER
    data_key = "stop_orderid"
    sorting = True

    headers = {
        "stop_orderid": {"display": "停止委托号","cell": BaseCell,"update": False,},
        "vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True},
        "vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
        "direction": {"display": "方向", "cell": EnumCell, "update": False},
        "offset": {"display": "开平", "cell": EnumCell, "update": False},
        "price": {"display": "价格", "cell": BaseCell, "update": False},
        "volume": {"display": "数量", "cell": BaseCell, "update": False},
        "status": {"display": "状态", "cell": EnumCell, "update": True},
        "lock": {"display": "锁仓", "cell": BaseCell, "update": False},
        "strategy_name": {"display": "策略名", "cell": BaseCell, "update": False},
    }


class LogMonitor(BaseMonitor):
    """
    Monitor for log data.
    """

    event_type = EVENT_CTA_LOG
    data_key = ""
    sorting = False

    headers = {
        "time": {"display": "时间", "cell": TimeCell, "update": False},
        "msg": {"display": "信息", "cell": MsgCell, "update": False},
    }

    def init_ui(self):
        """
        Stretch last column.
        """
        super(LogMonitor, self).init_ui()

        self.horizontalHeader().setSectionResizeMode(
            1, QtWidgets.QHeaderView.Stretch
        )

    def insert_new_row(self, data):
        """
        Insert a new row at the top of table.
        """
        super(LogMonitor, self).insert_new_row(data)
        self.resizeRowToContents(0)


class SettingEditor(QtWidgets.QDialog):
    """
    For creating new strategy and editing strategy parameters.
    """

    def __init__(
        self, parameters: dict, strategy_name: str = "", class_name: str = ""
    ):
        """"""
        super(SettingEditor, self).__init__()

        self.parameters = parameters
        self.strategy_name = strategy_name
        self.class_name = class_name

        self.edits = {}

        self.init_ui()

    def init_ui(self):
        """"""
        form = QtWidgets.QFormLayout()

        # Add vt_symbol and name edit if add new strategy
        if self.class_name:
            self.setWindowTitle(f"添加策略:{self.class_name}")
            button_text = "添加"
            parameters = {"strategy_name": "", "vt_symbol": ""}
            parameters.update(self.parameters)
        else:
            self.setWindowTitle(f"参数编辑:{self.strategy_name}")
            button_text = "确定"
            parameters = self.parameters

        for name, value in parameters.items():
            type_ = type(value)

            edit = QtWidgets.QLineEdit(str(value))
            if type_ is int:
                validator = QtGui.QIntValidator()
                edit.setValidator(validator)
            elif type_ is float:
                validator = QtGui.QDoubleValidator()
                edit.setValidator(validator)    

            form.addRow(f"{name} {type_}", edit)

            self.edits[name] = (edit, type_)

        button = QtWidgets.QPushButton(button_text)
        button.clicked.connect(self.accept)
        form.addRow(button)

        widget = QtWidgets.QWidget()
        widget.setLayout(form)

        scroll = QtWidgets.QScrollArea()
        scroll.setWidgetResizable(True)
        scroll.setWidget(widget)

        vbox = QtWidgets.QVBoxLayout()
        vbox.addWidget(scroll)
        self.setLayout(vbox)

    def get_setting(self):
        """"""
        setting = {}

        if self.class_name:
            setting["class_name"] = self.class_name

        for name, tp in self.edits.items():
            edit, type_ = tp
            value_text = edit.text()

            if type_ == bool:
                if value_text == "True":
                    value = True
                else:
                    value = False
            else:
                value = type_(value_text)

            setting[name] = value

        return setting

修改vnpy\app\cta_backtester\engine.py

import os
import importlib
import traceback
from datetime import datetime
from threading import Thread
from pathlib import Path
from inspect import getfile

from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Interval
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.object import HistoryRequest
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.database import database_manager
from vnpy.app.cta_strategy import CtaTemplate
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting

APP_NAME = "CtaBacktester"

EVENT_BACKTESTER_LOG = "eBacktesterLog"
EVENT_BACKTESTER_BACKTESTING_FINISHED = "eBacktesterBacktestingFinished"
EVENT_BACKTESTER_OPTIMIZATION_FINISHED = "eBacktesterOptimizationFinished"

from vnpy.app.cta_strategy.base import EngineType     # hxxjava add

class BacktesterEngine(BaseEngine):
    """
    For running CTA strategy backtesting.
    """
    engine_type = EngineType.BACKTESTING  # hxxjava add --- 供策略回测时使用

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine, event_engine, APP_NAME)

        self.classes = {}
        self.backtesting_engine = None
        self.thread = None

        # Backtesting reuslt
        self.result_df = None
        self.result_statistics = None

        # Optimization result
        self.result_values = None

    def init_engine(self):
        """"""
        self.write_log("初始化CTA回测引擎")

        self.backtesting_engine = BacktestingEngine()
        # Redirect log from backtesting engine outside.
        self.backtesting_engine.output = self.write_log

        self.load_strategy_class()
        self.write_log("策略文件加载完成")

        self.init_rqdata()

    def init_rqdata(self):
        """
        Init RQData client.
        """
        result = rqdata_client.init()
        if result:
            self.write_log("RQData数据接口初始化成功")

    def write_log(self, msg: str):
        """"""
        event = Event(EVENT_BACKTESTER_LOG)
        event.data = msg
        self.event_engine.put(event)

    def load_strategy_class(self):
        """
        Load strategy class from source code.
        """
        app_path = Path(__file__).parent.parent
        path1 = app_path.joinpath("cta_strategy", "strategies")
        self.load_strategy_class_from_folder(
            path1, "vnpy.app.cta_strategy.strategies")

        path2 = Path.cwd().joinpath("strategies")
        self.load_strategy_class_from_folder(path2, "strategies")

    def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
        """
        Load strategy class from certain folder.
        """
        for dirpath, dirnames, filenames in os.walk(path):
            for filename in filenames:
                # Load python source code file
                if filename.endswith(".py"):
                    strategy_module_name = ".".join(
                        [module_name, filename.replace(".py", "")])
                    self.load_strategy_class_from_module(strategy_module_name)
                # Load compiled pyd binary file
                elif filename.endswith(".pyd"):
                    strategy_module_name = ".".join(
                        [module_name, filename.split(".")[0]])
                    self.load_strategy_class_from_module(strategy_module_name)

    def load_strategy_class_from_module(self, module_name: str):
        """
        Load strategy class from module file.
        """
        try:
            module = importlib.import_module(module_name)
            importlib.reload(module)

            for name in dir(module):
                value = getattr(module, name)
                if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
                    self.classes[value.__name__] = value
        except:  # noqa
            msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
            self.write_log(msg)

    def reload_strategy_class(self):
        """"""
        self.classes.clear()
        self.load_strategy_class()
        self.write_log("策略文件重载刷新完成")

    def get_strategy_class_names(self):
        """"""
        return list(self.classes.keys())

    def run_backtesting(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        setting: dict
    ):
        """"""
        self.result_df = None
        self.result_statistics = None

        engine = self.backtesting_engine
        engine.clear_data()

        engine.set_parameters(
            vt_symbol=vt_symbol,
            interval=interval,
            start=start,
            end=end,
            rate=rate,
            slippage=slippage,
            size=size,
            pricetick=pricetick,
            capital=capital,
            inverse=inverse
        )

        strategy_class = self.classes[class_name]
        engine.add_strategy(
            strategy_class,
            setting
        )

        engine.load_data()
        engine.run_backtesting()
        self.result_df = engine.calculate_result()
        self.result_statistics = engine.calculate_statistics(output=False)

        # Clear thread object handler.
        self.thread = None

        # Put backtesting done event
        event = Event(EVENT_BACKTESTER_BACKTESTING_FINISHED)
        self.event_engine.put(event)

    def start_backtesting(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        setting: dict
    ):
        if self.thread:
            self.write_log("已有任务在运行中,请等待完成")
            return False

        self.write_log("-" * 40)
        self.thread = Thread(
            target=self.run_backtesting,
            args=(
                class_name,
                vt_symbol,
                interval,
                start,
                end,
                rate,
                slippage,
                size,
                pricetick,
                capital,
                inverse,
                setting
            )
        )
        self.thread.start()

        return True

    def get_result_df(self):
        """"""
        return self.result_df

    def get_result_statistics(self):
        """"""
        return self.result_statistics

    def get_result_values(self):
        """"""
        return self.result_values

    def get_default_setting(self, class_name: str):
        """"""
        strategy_class = self.classes[class_name]
        return strategy_class.get_class_parameters()

    def run_optimization(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        optimization_setting: OptimizationSetting,
        use_ga: bool
    ):
        """"""
        if use_ga:
            self.write_log("开始遗传算法参数优化")
        else:
            self.write_log("开始多进程参数优化")

        self.result_values = None

        engine = self.backtesting_engine
        engine.clear_data()

        engine.set_parameters(
            vt_symbol=vt_symbol,
            interval=interval,
            start=start,
            end=end,
            rate=rate,
            slippage=slippage,
            size=size,
            pricetick=pricetick,
            capital=capital,
            inverse=inverse
        )

        strategy_class = self.classes[class_name]
        engine.add_strategy(
            strategy_class,
            {}
        )

        if use_ga:
            self.result_values = engine.run_ga_optimization(
                optimization_setting,
                output=False
            )
        else:
            self.result_values = engine.run_optimization(
                optimization_setting,
                output=False
            )

        # Clear thread object handler.
        self.thread = None
        self.write_log("多进程参数优化完成")

        # Put optimization done event
        event = Event(EVENT_BACKTESTER_OPTIMIZATION_FINISHED)
        self.event_engine.put(event)

    def start_optimization(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        optimization_setting: OptimizationSetting,
        use_ga: bool
    ):
        if self.thread:
            self.write_log("已有任务在运行中,请等待完成")
            return False

        self.write_log("-" * 40)
        self.thread = Thread(
            target=self.run_optimization,
            args=(
                class_name,
                vt_symbol,
                interval,
                start,
                end,
                rate,
                slippage,
                size,
                pricetick,
                capital,
                inverse,
                optimization_setting,
                use_ga
            )
        )
        self.thread.start()

        return True

    def run_downloading(
        self,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime
    ):
        """
        Query bar data from RQData.
        """
        self.write_log(f"{vt_symbol}-{interval}开始下载历史数据")

        try:
            symbol, exchange = extract_vt_symbol(vt_symbol)
        except ValueError:
            self.write_log(f"{vt_symbol}解析失败,请检查交易所后缀")
            self.thread = None
            return

        req = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            interval=Interval(interval),
            start=start,
            end=end
        )

        contract = self.main_engine.get_contract(vt_symbol)

        try:
            # If history data provided in gateway, then query
            if contract and contract.history_data:
                data = self.main_engine.query_history(
                    req, contract.gateway_name
                )
            # Otherwise use RQData to query data
            else:
                data = rqdata_client.query_history(req)

            if data:
                database_manager.save_bar_data(data)
                self.write_log(f"{vt_symbol}-{interval}历史数据下载完成")
            else:
                self.write_log(f"数据下载失败,无法获取{vt_symbol}的历史数据")
        except Exception:
            msg = f"数据下载失败,触发异常:\n{traceback.format_exc()}"
            self.write_log(msg)

        # Clear thread object handler.
        self.thread = None

    def start_downloading(
        self,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime
    ):
        if self.thread:
            self.write_log("已有任务在运行中,请等待完成")
            return False

        self.write_log("-" * 40)
        self.thread = Thread(
            target=self.run_downloading,
            args=(
                vt_symbol,
                interval,
                start,
                end
            )
        )
        self.thread.start()

        return True

    def get_all_trades(self):
        """"""
        return self.backtesting_engine.get_all_trades()

    def get_all_orders(self):
        """"""
        return self.backtesting_engine.get_all_orders()

    def get_all_daily_results(self):
        """"""
        return self.backtesting_engine.get_all_daily_results()

    def get_history_data(self):
        """"""
        return self.backtesting_engine.history_data

    def get_strategy_class_file(self, class_name: str):
        """"""
        strategy_class = self.classes[class_name]
        file_path = getfile(strategy_class)
        return file_path

修改用户策略KxMonitor.py

from typing import Any,List,Dict,Tuple
import copy

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager,
    StopOrder,
    Direction
)

from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event

from vnpy.trader.object import (
    LogData,
    TickData,
    BarData,
    TradeData,
    OrderData,
)

from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval

from vnpy.app.cta_strategy.base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_TICK,
    EVENT_CTA_HISTORY_BAR,
    EVENT_CTA_BAR,
    EVENT_CTA_ORDER,
    EVENT_CTA_TRADE,    
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)

from vnpy.usertools.kx_chart import (   # hxxjava add
    NewChartWidget,
    CandleItem,
    VolumeItem, 
    LineItem,
    SmaItem,
    RsiItem,
    MacdItem,
)

from vnpy.usertools.kx_chart import NewChartWidget  # hxxjava add

class _kx_strategy(CtaTemplate):
    """"""

    author = "hxxjava"
    kx_interval = 1
    show_chart = False  # 显示K线图表 

    parameters = [
        "kx_interval",
        "show_chart"
    ]

    kx_count:int = 0
    cta_manager = None

    variables = ["kx_count"]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)
        self.bg = BarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar)
        self.am = ArrayManager()

        cta_engine:CtaEngine = self.cta_engine
        self.engine_type = cta_engine.engine_type
        self.even_engine = cta_engine.main_engine.event_engine

        # 必须在这里声明,因为它们是实例变量
        self.all_bars:List[BarData] = []    
        self.current_tick:[TickData] = None
        self.current_bar:[BarData] = None
        self.last_tick:[TickData] = None

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.load_bar(20)

        if len(self.all_bars)>0:
            self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)

    def on_start(self):
        """ """
        self.write_log("已开始")

    def on_stop(self):
        """"""
        self.write_log("_kx_strategy 已停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.current_tick = tick    # 记录最新tick 

        if self.inited:     
            # 先产生当前临时K线
            self.cur_bar = self.get_cur_bar(tick)    
            if self.cur_bar:
                # 发送当前临时K线更新消息
                self.send_event(EVENT_CTA_BAR,self.cur_bar)

        # 再更新tick,产生1分钟K线乃至N 分钟线
        self.bg.update_tick(tick)

        self.send_event(EVENT_CTA_TICK,tick)  
        self.last_tick = tick

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        if self.inited:
            self.write_log(f"I got a 1min BarData")
        self.bg.update_bar(bar)

    def on_Nmin_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.all_bars.append(bar)
        self.kx_count = len(self.all_bars)

        if self.inited:
            self.write_log(f"I got a {self.kx_interval}min BarData")
            self.send_event(EVENT_CTA_BAR,bar)

            if self.current_tick:
                # 当新N分钟K线产生的时候,立即产生新的临时K线
                self.current_bar = None
                self.get_cur_bar(self.current_tick)

        self.put_event()

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.send_event(EVENT_CTA_TRADE,trade)

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        self.send_event(EVENT_CTA_ORDER,order)

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        self.send_event(EVENT_CTA_STOPORDER,stop_order)

    def get_cur_bar(self,tick:TickData)->BarData:
        """
        产生临时K线,每个tick都会更新。除非把self.window_bar赋值为None,
        不会产生新的K线,只会更新K线的量和加。
        注意:self.last_tick是在BarGenerator中声明和改变的
        """
        if not self.inited or not self.last_tick:
            return None

        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return None

        if not self.current_bar:
            # Generate timestamp for bar data
            if self.bg.interval == Interval.MINUTE:
                dt = tick.datetime.replace(second=0, microsecond=0)
            else:
                dt = tick.datetime.replace(minute=0, second=0, microsecond=0)

            self.current_bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                datetime=dt,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
            )
        # Otherwise, update high/low price into window bar
        else:
            self.current_bar.high_price = max(self.current_bar.high_price, tick.last_price)
            self.current_bar.low_price = min(self.current_bar.low_price, tick.last_price)

        # Update last price/volume into window bar
        self.current_bar.close_price = tick.last_price
        volume_change = tick.volume - self.last_tick.volume
        self.current_bar.volume += volume_change
        self.current_bar.open_interest = tick.open_interest

        return copy.deepcopy(self.current_bar)

    def send_event(self,event_type:str,data:Any):        
        if self.engine_type==EngineType.LIVE and self.show_chart:     # "如果显示K线图表"
            self.even_engine.put(Event(event_type,(self.strategy_name,data)))

    def init_kx_chart(self,kx_chart:NewChartWidget=None):    # hxxjava add ----- 提供给外部调用
        # self.write_log("init_kx_chart executed !!!")
        if kx_chart:
            kx_chart.add_plot("candle", hide_x_axis=True)
            kx_chart.add_plot("volume", maximum_height=150)
            kx_chart.add_plot("rsi", maximum_height=150)
            kx_chart.add_plot("macd", maximum_height=150)
            kx_chart.add_item(CandleItem, "candle", "candle")
            kx_chart.add_item(VolumeItem, "volume", "volume")

            kx_chart.add_item(LineItem, "line", "candle")
            kx_chart.add_item(SmaItem, "sma", "candle")
            kx_chart.add_item(RsiItem, "rsi", "rsi")
            kx_chart.add_item(MacdItem, "macd", "macd")
            kx_chart.add_last_price_line()
            kx_chart.add_cursor()

添加可以显示的策略

description

启动VnTrader,进入策略管理界面,完成如下步骤:
1)从策略下拉框中选择_kx_strategy策略
2)点击添加策略按钮进入3界面
3)输入策略名称、vt_symbol、kx_interval和show_chart的值,注意kx_interval这里是你想要的K线周期,单位是分钟。show_chart参数为True标识需要显示K线图表,其他值则不显示。
4)初始化策略,如果参数为True的话,完成后显示K线图表窗口,并且显示20日里的历史K线图
5)按启动按钮启动策略,如果是交易时段,则K线图表就会显示最新收到的K线。提示还会实时显示未完成的临时K线

K线图表运行时的界面

rb2010合约的19分钟K线图

description

ag2012合约的5分钟K线图

description

问题:

用户实现了自己的CTA策略,可能会放在多个合约上跑。用户策略里会声母一系列的策略成员变量,这些策略成员变量应该是每个策略实例是不同的。可是我发现事实不是这样的!——不同的合约竟然共用着一个了的策略成员变量!

策略代码

下面代码保存在用户策略文件夹下的test_strategy.py文件中

from typing import Any,List,Dict,Tuple
import copy

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager,
    StopOrder,
    Direction
)

from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event

from vnpy.trader.object import (
    LogData,
    TickData,
    BarData,
    TradeData,
    OrderData,
)


class test_strategy(CtaTemplate):
    """"""

    author = "hxxjava"
    kx_interval = 1
    parameters = [
        "kx_interval"
    ]

    kx_count:int = 0
    all_bars:List[BarData] =[]

    variables = ["kx_count"]

    relate_names:List[str] = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)
        self.bg = BarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar)
        self.am = ArrayManager()

        self.relate_names.append(vt_symbol)

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("test_strategy 初始化")
        self.load_bar(20)

        self.write_log(f"relate_names={self.relate_names} !!!")

    def on_start(self):
        """ """
        self.write_log(f"test_strategy 已开始 self.kx_interval={self.kx_interval}",)

    def on_stop(self):
        """"""
        self.write_log("test_strategy 已停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        if self.inited:
            self.write_log(f"I got a 1min BarData")
        self.bg.update_bar(bar)

    def on_Nmin_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.all_bars.append(bar)
        self.kx_count = len(self.all_bars)

        if self.inited:
            self.write_log(f"I got a {self.kx_interval}min BarData {self.kx_count}")

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """

观察到的问题:

创建rb2010.SHFE合约的test_strategy的实例,kx_interval为5

description

创建ag2012.SHFE合约的test_strategy的实例,kx_interval为10

description

init函数中的日志输出尽然是这样的

用来存放该策略里20日的 all_bars数组中保存有ag2012.SHFE的所有20日里的所有10分钟K线数据
description

再次初始化test-rb2010策略

用来存放该策略里20日的 all_bars数组中,不只是保存有rb2010的所有5分钟K线数据,也保存有ag2012的10分钟K线数据。

description

问题的原因:

不同的用户策略应该拥有各自不同的成员变量:
all_bars
relate_names
可是从实际的测试结果看,它们却是相同的,这是不应该的!
原因发生在cta_engine里的这个函数中:

    def add_strategy(
        self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
    ):
        """
        Add a new strategy.
        """
        if strategy_name in self.strategies:
            self.write_log(f"创建策略失败,存在重名{strategy_name}")
            return

        strategy_class = self.classes.get(class_name, None)
        if not strategy_class:
            self.write_log(f"创建策略失败,找不到策略类{class_name}")
            return

        strategy = strategy_class(self, strategy_name, vt_symbol, setting)
        self.strategies[strategy_name] = strategy

        # Add vt_symbol to strategy map.
        strategies = self.symbol_strategy_map[vt_symbol]
        strategies.append(strategy)

        # Update to setting file.
        self.update_strategy_setting(strategy_name, setting)

        self.put_strategy_event(strategy)

这里的的代码可以看出,不同的策略实例使用相同的策略类时,不是立即创建新的策略类实例,而是从self.classes字典中,根据class_name查询得到的。

因此出现了本次测试中出现的,不同的策略实例使用同一个策略类实例,当然它们所引用的列表成员变量也是同一个的情况!

本主题已经失败了,正确的做法请参见:https://www.vnpy.com/forum/topic/3920-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian-1

因为没有办法删除自己的主题,所以另外开了一个(1)的主题。

需要为CTA策略配上K线图表

VNPY系统确实是非常好的开发平台!可是做CTA交易的朋友一定有这样的感觉,策略在安静地运行,你却看不见它的K线,也无法直观地看到它的交易情况。经常来回地在第三方的行情客户端来回切换,为的是看看都对策略运动什么样一个情况。如果能够为每个CTA策略都给出一个观看图表的机会,那该有多好!
本人经过一周时间的努力下,已经基本让CTA策略可以看的见了。

实现思路

1 实现一个给予ChartWizard的K线图NewChartWidget,它可以接受历史K线数据、tick数据、bar线数据、order数据、trader数据的推送
2 定义五个与行情相关的消息类型

为K线图表添砖加瓦——让CTA策略看得见(1)

EVENT_CTA_HISTORY_BAR  —— 历史K线消息
EVENT_CTA_TICK —— TICK消息
EVENT_CTA_BAR —— BAR消息
EVENT_CTA_ORDER  —— 委托单消息
EVENT_CTA_TRADE  —— 成交单
EVENT_CTA_STOPORDER —— 停止单消息(系统本来就有)

3 利用策略的on_start(),on_tick(),on_bar(),on_order(),on_stop_order(),on_trade()接口,发送这些消息
4 可以接受实时的临时K线的显示
5 可以显示行情,也已显示委托、成交的发生位置

先看看效果图:

description

操作简便

经过修改后,在CTA策略管理界面中,如上图所示,每个策略都会增加一个 “K线图表” 按钮,在已经新建好策略后,只有策略初始化后“K线图表” 按钮和“开始”按钮同时有效,如果你想观看该策略的K线图,先点击“K线图表” 按钮,就会显示一个空的K线图窗口,再和原来一样按“开始”按钮,K线图立即就把策略在初始化阶段读取的历史K线显示处理,当然你的你所关心的主图指标和附图指标可以是可以显示的。

下一步的想法:

目前只实现了几个代表性的几个指标,还不能给自由配置。未来应该是设计成可以在每个策略可以独立配置自己的K线图表的主图和附图的数量可指标,因为不同的策略的算法是不一样,周期也是可能是不一样的。

下一次,我会把目前功能的实现方法和代码发表出,正在整理,请大家等待。

本地停止单在VNPY系统中的是在本地维护的,符合条件后立即转化为限价委托单而执行的。
CTA策略管理界面CtaManager中,包含有一个停止单监视器StopOrderMonitor,用来显示策略已经发出的停止单的。

StopOrderMonitor包含如下的信息:

    headers = {
        "stop_orderid": {"display": "停止委托号","cell": BaseCell,"update": False,},
        "vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True},
        "vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
        "direction": {"display": "方向", "cell": EnumCell, "update": False},
        "offset": {"display": "开平", "cell": EnumCell, "update": False},
        "price": {"display": "价格", "cell": BaseCell, "update": False},
        "volume": {"display": "数量", "cell": BaseCell, "update": False},
        "status": {"display": "状态", "cell": EnumCell, "update": True},
        "lock": {"display": "锁仓", "cell": BaseCell, "update": False},
        "strategy_name": {"display": "策略名", "cell": BaseCell, "update": False},
    }

在实际使用的时候,大家是否有这种感觉,这么多停止单,你可以知道停止单的编号,却不知道它是:

  • 何时发出的,
  • 何时被撤销的,
  • 何时被触发的。
    这对观察策略的行为很重要的信息。虽然编号可以描述停止单的时间顺序,但是不够具体,当有多个停止单没有被触发,你看着它们除了单号和价格外,其他长的都是一样,心中是一头雾水的。

建议还是增加字段。

目前StopOrder的字段

@dataclass
class StopOrder:
    vt_symbol: str
    direction: Direction
    offset: Offset
    price: float
    volume: float
    stop_orderid: str
    strategy_name: str
    lock: bool = False
    vt_orderids: list = field(default_factory=list)
    status: StopOrderStatus = StopOrderStatus.WAITING

缺少下单时间和结束时间

  • start_time:下单时间
  • end_time:结束时间
    因为已经有了状态status字段,所以当状态status是StopOrderStatus.CANCELLED时,end_time就是撤销时间;如果状态status是StopOrderStatus.TRIGGERED时end_time就是触发时间。

CtpGateway仅有的几个交易执行函数

description

CtpGateway继承了BaseGateway,同时包含两个重要的接口:

        self.td_api = CtpTdApi(self)   # 交易接口
        self.md_api = CtpMdApi(self) # 行情接口

另外还有1个向行情服务器订阅行情的订阅函数:

    def subscribe(self, req: SubscribeRequest):
        """"""
        self.md_api.subscribe(req)

4个向交易服务器发送委托请求、取消委托请求、查询账户请求和一个查询持仓请求函数。

    def send_order(self, req: OrderRequest):
        """发送委托请求"""
        if req.type == OrderType.RFQ:
            vt_orderid = self.td_api.send_rfq(req)
        else:
            vt_orderid = self.td_api.send_order(req)
        return vt_orderid

    def cancel_order(self, req: CancelRequest):
        """取消委托请求"""
        self.td_api.cancel_order(req)

    def query_account(self):
        """查询账户请求"""
        self.td_api.query_account()

    def query_position(self):
        """查询持仓请求"""
        self.td_api.query_position()

CtpGateway有账户查询函数,用户持仓函数,可是没有历史委托和历史成交查询函数。

CtpTdApi 提供的主动执行函数:

description

可它仅仅提供了可怜的5个主动执行交易的函数:
send_order()——发委托申请
cancel_order()——取消委托单
send_rfq()
query_account()——查询账户
query_position()——查询持仓

# vnpy.api.generator.ctp_td_source.function.cpp 提供了丰富的接口函数:

int TdApi::reqAuthenticate(const dict &req, int reqid)
int TdApi::reqUserLogin(const dict &req, int reqid)
int TdApi::reqUserLogout(const dict &req, int reqid)
int TdApi::reqUserPasswordUpdate(const dict &req, int reqid)
int TdApi::reqTradingAccountPasswordUpdate(const dict &req, int reqid)
int TdApi::reqUserAuthMethod(const dict &req, int reqid)
int TdApi::reqGenUserCaptcha(const dict &req, int reqid)
int TdApi::reqGenUserText(const dict &req, int reqid)
int TdApi::reqUserLoginWithCaptcha(const dict &req, int reqid)
int TdApi::reqUserLoginWithText(const dict &req, int reqid)
int TdApi::reqUserLoginWithOTP(const dict &req, int reqid)
int TdApi::reqOrderInsert(const dict &req, int reqid)
int TdApi::reqParkedOrderInsert(const dict &req, int reqid)
int TdApi::reqParkedOrderAction(const dict &req, int reqid)
int TdApi::reqOrderAction(const dict &req, int reqid)
int TdApi::reqQueryMaxOrderVolume(const dict &req, int reqid)
int TdApi::reqSettlementInfoConfirm(const dict &req, int reqid)
int TdApi::reqRemoveParkedOrder(const dict &req, int reqid)
int TdApi::reqRemoveParkedOrderAction(const dict &req, int reqid)
int TdApi::reqExecOrderInsert(const dict &req, int reqid)
int TdApi::reqExecOrderAction(const dict &req, int reqid)
int TdApi::reqForQuoteInsert(const dict &req, int reqid)
int TdApi::reqQuoteInsert(const dict &req, int reqid)
int TdApi::reqQuoteAction(const dict &req, int reqid)
int TdApi::reqBatchOrderAction(const dict &req, int reqid)
int TdApi::reqOptionSelfCloseInsert(const dict &req, int reqid)
int TdApi::reqOptionSelfCloseAction(const dict &req, int reqid)
int TdApi::reqCombActionInsert(const dict &req, int reqid)
int TdApi::reqQryOrder(const dict &req, int reqid)
int TdApi::reqQryTrade(const dict &req, int reqid)
int TdApi::reqQryInvestorPosition(const dict &req, int reqid)
int TdApi::reqQryTradingAccount(const dict &req, int reqid)
int TdApi::reqQryInvestor(const dict &req, int reqid)
int TdApi::reqQryTradingCode(const dict &req, int reqid)
int TdApi::reqQryInstrumentMarginRate(const dict &req, int reqid)
int TdApi::reqQryInstrumentCommissionRate(const dict &req, int reqid)
int TdApi::reqQryExchange(const dict &req, int reqid)
int TdApi::reqQryProduct(const dict &req, int reqid)
int TdApi::reqQryInstrument(const dict &req, int reqid)
int TdApi::reqQryDepthMarketData(const dict &req, int reqid)
int TdApi::reqQrySettlementInfo(const dict &req, int reqid)
int TdApi::reqQryTransferBank(const dict &req, int reqid)
int TdApi::reqQryInvestorPositionDetail(const dict &req, int reqid)
int TdApi::reqQryNotice(const dict &req, int reqid)
int TdApi::reqQrySettlementInfoConfirm(const dict &req, int reqid)
int TdApi::reqQryInvestorPositionCombineDetail(const dict &req, int reqid)
int TdApi::reqQryCFMMCTradingAccountKey(const dict &req, int reqid)
int TdApi::reqQryEWarrantOffset(const dict &req, int reqid)
int TdApi::reqQryInvestorProductGroupMargin(const dict &req, int reqid)
int TdApi::reqQryExchangeMarginRate(const dict &req, int reqid)
int TdApi::reqQryExchangeMarginRateAdjust(const dict &req, int reqid)
int TdApi::reqQryExchangeRate(const dict &req, int reqid)
int TdApi::reqQrySecAgentACIDMap(const dict &req, int reqid)
int TdApi::reqQryProductExchRate(const dict &req, int reqid)
int TdApi::reqQryProductGroup(const dict &req, int reqid)
int TdApi::reqQryMMInstrumentCommissionRate(const dict &req, int reqid)
int TdApi::reqQryMMOptionInstrCommRate(const dict &req, int reqid)
int TdApi::reqQryInstrumentOrderCommRate(const dict &req, int reqid)
int TdApi::reqQrySecAgentTradingAccount(const dict &req, int reqid)
int TdApi::reqQrySecAgentCheckMode(const dict &req, int reqid)
int TdApi::reqQrySecAgentTradeInfo(const dict &req, int reqid)
int TdApi::reqQryOptionInstrTradeCost(const dict &req, int reqid)
int TdApi::reqQryOptionInstrCommRate(const dict &req, int reqid)
int TdApi::reqQryExecOrder(const dict &req, int reqid)
int TdApi::reqQryForQuote(const dict &req, int reqid)
int TdApi::reqQryQuote(const dict &req, int reqid)
int TdApi::reqQryOptionSelfClose(const dict &req, int reqid)
int TdApi::reqQryInvestUnit(const dict &req, int reqid)
int TdApi::reqQryCombInstrumentGuard(const dict &req, int reqid)
int TdApi::reqQryCombAction(const dict &req, int reqid)
int TdApi::reqQryTransferSerial(const dict &req, int reqid)
int TdApi::reqQryAccountregister(const dict &req, int reqid)
int TdApi::reqQryContractBank(const dict &req, int reqid)
int TdApi::reqQryParkedOrder(const dict &req, int reqid)
int TdApi::reqQryParkedOrderAction(const dict &req, int reqid)
int TdApi::reqQryTradingNotice(const dict &req, int reqid)
int TdApi::reqQryBrokerTradingParams(const dict &req, int reqid)
int TdApi::reqQryBrokerTradingAlgos(const dict &req, int reqid)
int TdApi::reqQueryCFMMCTradingAccountToken(const dict &req, int reqid)
int TdApi::reqFromBankToFutureByFuture(const dict &req, int reqid)
int TdApi::reqFromFutureToBankByFuture(const dict &req, int reqid)
int TdApi::reqQueryBankAccountMoneyByFuture(const dict &req, int reqid)

其中这两个函数在用户是否可以包含到CtpTdApi接口中?

这些函数中下面两个函数是否可以包含到CtpTdApi接口中,然后封装到CtpGateway中供用户策略中使用。

int TdApi::reqQryOrder(const dict &req, int reqid)
{
    CThostFtdcQryOrderField myreq = CThostFtdcQryOrderField();
    memset(&myreq, 0, sizeof(myreq));
    getString(req, "BrokerID", myreq.BrokerID);
    getString(req, "InvestorID", myreq.InvestorID);
    getString(req, "InstrumentID", myreq.InstrumentID);
    getString(req, "ExchangeID", myreq.ExchangeID);
    getString(req, "OrderSysID", myreq.OrderSysID);
    getString(req, "InsertTimeStart", myreq.InsertTimeStart);   // 开始时间
    getString(req, "InsertTimeEnd", myreq.InsertTimeEnd);     // 结束时间
    getString(req, "InvestUnitID", myreq.InvestUnitID);
    int i = this->api->ReqQryOrder(&myreq, reqid);
    return i;
};

int TdApi::reqQryTrade(const dict &req, int reqid)
{
    CThostFtdcQryTradeField myreq = CThostFtdcQryTradeField();
    memset(&myreq, 0, sizeof(myreq));
    getString(req, "BrokerID", myreq.BrokerID);
    getString(req, "InvestorID", myreq.InvestorID);
    getString(req, "InstrumentID", myreq.InstrumentID);
    getString(req, "ExchangeID", myreq.ExchangeID);
    getString(req, "TradeID", myreq.TradeID);
    getString(req, "TradeTimeStart", myreq.TradeTimeStart);  // 开始时间
    getString(req, "TradeTimeEnd", myreq.TradeTimeEnd);    // 结束时间
    getString(req, "InvestUnitID", myreq.InvestUnitID);
    int i = this->api->ReqQryTrade(&myreq, reqid);
    return i;
};

有必要添加历史委托单和历史成交单查询吗?

无法避免的成交单丢失

交易中,时常会关心上次盈亏和当前算法盈亏状况,这可以通过对开仓以来的所有成交单进行计算得到。
但是因为种种原因,客户端策略可能会丢失部分成交单,造成这种结果的原因很多,如网络故障、电脑宕机、
没有在全交易时段运行策略,都会导致部分成交单不能给被记录。

成交查询可以补齐丢失的成交单

交易所存有完整的成交记录,而且ctp_td_source.function.cpp提供了全面的历史委托查询和历史成交查询函数,
为什么咱们不可以把这些功能多引入一些呢?

有些网关API已经提供了reqQryTrade函数:

vnpy\gateway\da\da_gateway.py(798): self.reqQryTrade(da_req, self.reqid)
vnpy\gateway\tora\td.py(583): err = self._native_api.ReqQryTrade(info, self._get_new_req_id())
vnpy\gateway\uft\uft_gateway.py(489): self.reqQryTrade({}, self.reqid)

在策略中有需要查询或者计算上次完整交易的盈亏数值吗?
陈老师的进阶课中有讲BacktesingEngine是如何计算逐日盯市的,接近需要,
但不知道是否有更加规范的方法,比如从回测引擎或者实盘引擎里就可以查询到?

© 2015-2022 微信 18391752892
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】