通用型逐笔成交统计
逐笔成交统计想用通用化,难点在于去限定一次完整开平交易的开始点和结束点,抽象来说就是寻找特殊的断点对所有成交记录进行划分。
断点的选择
 
而在算法状态机控制中,我们可以知道数字0是一个非常有用的评判标准,即我们构建一列数据,让它数值在完全平仓后变成0,就知道真正的平仓时间。
在实践中,累计净持仓恰恰好符合这个标准,我们把多头仓位设为”+”,空头仓位设为“-”,得到如下表的【方向持仓】,对【方向持仓】进行累计得到【净持仓】。
这样,我们基于【净持仓】为0可以得到每次开平交易的结束点。而该结束点为成交记录的断点。

 
使用断点划分成交记录
 
为了简单演示,下面我们只显示【净持仓】(列)为0的成交信息(行),如下表所示,一共发生了5开完整的开平仓交易。每笔交易的结束点对应的交易序号分别为3、5、8、12、20。这5个结束点即为对所有成交信息的断点。

 
之后,我们要引入2个新的概念:
 
- 存量:某一时间点的累计统计量
- 增量:某一时间段内,累计统计量的增加量
 
存量是静态的,可以理解为对累计统计量的信息进行时间切片;而增量是动态的,代表时间切片信息的变化量,所以他们二者的关系如下:
T0时刻存量 + T0->T1增量 = T1时刻存量
 
换句话说,
T0->T1增量 = T1时刻存量 - T0时刻存量
回到逐笔回测统计主题上,增量这个概念,就能代表最新的完整开平仓交易,例如其每笔盈亏,对累计盈亏的影响。
如下图所示,在完成第一笔开平仓交易后,累计盈亏是1000;完成了第二笔完整的开平仓交易,累计盈亏是2000,那么二者的差别,即2000-1000=1000。这增加1000的盈利,就是属于第二笔开平仓交易的。

 
所以,通过对每个断点存量信息的对比,我们就可以得到每笔开平仓成交后的统计量:

 
这些开平仓的统计量可以如下表所示的开平成交量、开平盈亏,也可以是开平仓交易的持仓时间、手续费、滑点以及净盈亏:

 
 
从算法的原理到代码
计算开平交易结果
 
- 生成基础DataFrame信息,包括每笔交易的方向,开平,价格,时间;
- 计算方向持仓,以及有方向持仓累加的净持仓,计算累计持仓存量(成交量的简单相加);
- 计算盈亏存量,当净持仓为0时候,显示每笔开平交易对于存量盈亏的增量;
- 当净持仓为0时候,显示每笔开平交易的持仓时间,成交量,成交额的增量;
- 对DataFrame的行进行处理,剔除出那些净持仓不为0的行数,即剩下的行数都是每笔开平交易的最后一次平仓交易,通过平仓的方向可以判断该完整开平流程,如方向为空,开平为平,那么完整开平交易为多开->空平。
- 计算手续费,滑点以及净盈亏
- 返回新的DataFrame。
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)
def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)
    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume
    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)
    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()
    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]
        return result
    df["pos"] = df.apply(calculate_pos, axis=1)
    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()
    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()
    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)
    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   
    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  
    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 
    # Select row data with net pos equil to zero     
    df = df.dropna()
    return df
def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)
    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)
    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)
    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage
    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]
    result = calculate_base_net_pnl(trade_df, capital)
    return result 
 
汇总生成资金曲线
 
- 基于每笔开平交易的净盈亏,计算累计盈亏;
- 累计盈亏加上用户输入的起始资金即为资金曲线;
- 基于资金曲线计算每笔的每笔开平交易的盈利率,回撤和百分比回撤。
def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
    df.reset_index(drop=True, inplace=True)
    return df 
 
统计整体策略效果
 
- 主要是一些统计指标的计算,如平均滑点,平均手续费,总成交次数,胜率,盈亏比,收益回撤比等等。
- 然后是画图,画出资金曲线图,每笔净盈亏图和净盈亏分布图
def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()
    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()
    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)
    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)
    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio
    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent
    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")
    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")
    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")
    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")
    if not show_chart:
        return
    plt.figure(figsize=(10, 12))
    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)
    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)
    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)
    plt.show()
def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}") 
 
统计纯多头和纯空头交易
纯多头交易就是只有多开->空平的交易,而纯空头交易就是反过来。
为了筛选出纯多开交易,只要在DataFrame中判断其平仓方向的空的即可;纯空头交易则反过来,平仓方向为多。
def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result
def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result 
 
整合所有计算步骤
最后,我们将上文中所有的函数进行整合,封装到单个函数中,用于实现策略回测效果的一键生成:
 
def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """
    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)
    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)
    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)
    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital)
 
 
最后附上完整的源代码
 
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)
def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)
    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume
    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)
    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()
    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]
        return result
    df["pos"] = df.apply(calculate_pos, axis=1)
    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()
    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()
    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)
    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   
    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  
    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 
    # Select row data with net pos equil to zero     
    df = df.dropna()
    return df
def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)
    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)
    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)
    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage
    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]
    result = calculate_base_net_pnl(trade_df, capital)
    return result
def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
    df.reset_index(drop=True, inplace=True)
    return df
def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result
def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result
def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()
    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()
    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)
    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)
    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio
    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent
    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")
    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")
    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")
    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")
    if not show_chart:
        return
    plt.figure(figsize=(10, 12))
    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)
    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)
    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)
    plt.show()
def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}")
def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """
    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)
    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)
    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)
    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital) 
 
了解更多知识,请关注vn.py社区公众号。
 

