class SpreadData:
""""""
def __init__(
self,
name: str,
legs: List[LegData],
price_multipliers: Dict[str, int],
trading_multipliers: Dict[str, int],
active_symbol: str,
inverse_contracts: Dict[str, bool],
min_volume: float
):
""""""
self.name: str = name
self.legs: Dict[str, LegData] = {}
self.active_leg: LegData = None
self.passive_legs: List[LegData] = []
self.min_volume: float = min_volume
self.pricetick: float = 0
# For calculating spread price
self.price_multipliers: Dict[str, int] = price_multipliers
# For calculating spread pos and sending orders
self.trading_multipliers: Dict[str, int] = trading_multipliers
self.inverse_contracts: Dict[str, bool] = inverse_contracts
self.price_formula: str = ""
self.trading_formula: str = ""
for leg in legs:
self.legs[leg.vt_symbol] = leg
if leg.vt_symbol == active_symbol:
self.active_leg = leg
else:
self.passive_legs.append(leg)
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.price_formula += f"+{price_multiplier}*{leg.vt_symbol}"
else:
self.price_formula += f"{price_multiplier}*{leg.vt_symbol}"
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if trading_multiplier > 0:
self.trading_formula += f"+{trading_multiplier}*{leg.vt_symbol}"
else:
self.trading_formula += f"{trading_multiplier}*{leg.vt_symbol}"
if not self.pricetick:
self.pricetick = leg.pricetick
else:
self.pricetick = min(self.pricetick, leg.pricetick)
# Spread data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.net_pos: float = 0
self.datetime: datetime = None
def calculate_price(self):
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
# Calculate price
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.bid_price += leg.bid_price * price_multiplier
self.ask_price += leg.ask_price * price_multiplier
else:
self.bid_price += leg.ask_price * price_multiplier
self.ask_price += leg.bid_price * price_multiplier
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
def calculate_pos(self):
""""""
long_pos = 0
short_pos = 0
for n, leg in enumerate(self.legs.values()):
leg_long_pos = 0
leg_short_pos = 0
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if not trading_multiplier:
continue
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
net_pos = leg.net_pos
else:
net_pos = calculate_inverse_volume(
leg.net_pos, leg.net_pos_price, leg.size)
adjusted_net_pos = net_pos / trading_multiplier
if adjusted_net_pos > 0:
adjusted_net_pos = floor_to(adjusted_net_pos, self.min_volume)
leg_long_pos = adjusted_net_pos
else:
adjusted_net_pos = ceil_to(adjusted_net_pos, self.min_volume)
leg_short_pos = abs(adjusted_net_pos)
if not n:
long_pos = leg_long_pos
short_pos = leg_short_pos
else:
long_pos = min(long_pos, leg_long_pos)
short_pos = min(short_pos, leg_short_pos)
if long_pos > 0:
self.net_pos = long_pos
else:
self.net_pos = -short_pos
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
def calculate_leg_volume(self, vt_symbol: str, spread_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
leg_volume = spread_volume * trading_multiplier
return leg_volume
def calculate_spread_volume(self, vt_symbol: str, leg_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
spread_volume = leg_volume / trading_multiplier
if spread_volume > 0:
spread_volume = floor_to(spread_volume, self.min_volume)
else:
spread_volume = ceil_to(spread_volume, self.min_volume)
return spread_volume
def to_tick(self):
""""""
tick = TickData(
symbol=self.name,
exchange=Exchange.LOCAL,
datetime=self.datetime,
name=self.name,
last_price=(self.bid_price + self.ask_price) / 2,
bid_price_1=self.bid_price,
ask_price_1=self.ask_price,
bid_volume_1=self.bid_volume,
ask_volume_1=self.ask_volume,
gateway_name="SPREAD"
)
return tick
def is_inverse(self, vt_symbol: str) -> bool:
""""""
inverse_contract = self.inverse_contracts[vt_symbol]
return inverse_contract
def get_leg_size(self, vt_symbol: str) -> float:
""""""
leg = self.legs[vt_symbol]
return leg.size
class LegData:
""""""
def __init__(self, vt_symbol: str):
""""""
self.vt_symbol: str = vt_symbol
# Price and position data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.long_pos: float = 0
self.short_pos: float = 0
self.net_pos: float = 0
self.last_price: float = 0
self.net_pos_price: float = 0 # Average entry price of net position
# Tick data buf
self.tick: TickData = None
# Contract data
self.size: float = 0
self.net_position: bool = False
self.min_volume: float = 0
self.pricetick: float = 0
def update_contract(self, contract: ContractData):
""""""
self.size = contract.size
self.net_position = contract.net_position
self.min_volume = contract.min_volume
self.pricetick = contract.pricetick
def update_tick(self, tick: TickData):
""""""
self.bid_price = tick.bid_price_1
self.ask_price = tick.ask_price_1
self.bid_volume = tick.bid_volume_1
self.ask_volume = tick.ask_volume_1
self.last_price = tick.last_price
self.tick = tick
def update_position(self, position: PositionData):
""""""
if position.direction == Direction.NET:
self.net_pos = position.volume
self.net_pos_price = position.price
else:
if position.direction == Direction.LONG:
self.long_pos = position.volume
else:
self.short_pos = position.volume
self.net_pos = self.long_pos - self.short_pos
def update_trade(self, trade: TradeData):
""""""
# Only update net pos for contract with net position mode
if self.net_position:
trade_cost = trade.volume * trade.price
old_cost = self.net_pos * self.net_pos_price
if trade.direction == Direction.LONG:
new_pos = self.net_pos + trade.volume
if self.net_pos >= 0:
new_cost = old_cost + trade_cost
self.net_pos_price = new_cost / new_pos
else:
# If all previous short position closed
if not new_pos:
self.net_pos_price = 0
# If only part short position closed
elif new_pos > 0:
self.net_pos_price = trade.price
else:
new_pos = self.net_pos - trade.volume
if self.net_pos <= 0:
new_cost = old_cost - trade_cost
self.net_pos_price = new_cost / new_pos
else:
# If all previous long position closed
if not new_pos:
self.net_pos_price = 0
# If only part long position closed
elif new_pos < 0:
self.net_pos_price = trade.price
self.net_pos = new_pos
else:
if trade.direction == Direction.LONG:
if trade.offset == Offset.OPEN:
self.long_pos += trade.volume
else:
self.short_pos -= trade.volume
else:
if trade.offset == Offset.OPEN:
self.short_pos += trade.volume
else:
self.long_pos -= trade.volume
self.net_pos = self.long_pos - self.short_pos
从上面的代码可以知道,SpreadData中包含若干个腿,它的tick数据应该是有各腿的tick合成的,可是我们看SpreadData的to_tck()的代码,看不是这样的!
def to_tick(self):
""""""
tick = TickData(
symbol=self.name,
exchange=Exchange.LOCAL,
datetime=self.datetime,
name=self.name,
last_price=(self.bid_price + self.ask_price) / 2,
bid_price_1=self.bid_price,
ask_price_1=self.ask_price,
bid_volume_1=self.bid_volume,
ask_volume_1=self.ask_volume,
gateway_name="SPREAD"
)
return tick
假如价差(SpreadData)的实例S中包含两腿(LegData)L1和L2,L1、L2的价格乘数分别为1和-1,那么:
在任意时刻,当L1得到了最新tick1,L2得到最新tick2,
L1的
L1.last_price=tick1.last_price
L1.bid_price_1=tick1.bid_price_1
L1.ask_price_1=tick1.ask_price_1
L2的部分数据
L2.last_price=tick2.last_price
L2.bid_price_1=tick2.bid_price_1
L2.ask_price_1=tick2.ask_price_1
那么经过价差S的calculate_price()的计算后,
S.last_price=L1.last_price-L2.last_price
S.bid_price_1=L1.bid_price_1-L2.bid_price_1
S.ask_price_1=L1.ask_price_1-L2.ask_price_1
问题来了:如果腿L1已经得到了有效数据,而腿L2还没有得到有效数据,那么价差S的价格将是无效的!
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
这里的条件意思是说如果价差的某个腿中的数据是无意义的,那么就清空价差的所有价格,那么此时SpreadData的to_tick()得到的tick就不是一个有效的tick数据!
只要价差的多个腿中有一个腿的数据没有使用实际的tick更新过,就会发生这种情况!
全部是0,因为clear_price()的代码如下:
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
我们知道价差的数据计算和更新是有SpreadDataEngine维护的,下面是SpreadDataEngine的process_tick_event():
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
spread.calculate_price() # 这里并没有对价差的价格计算是否有效的判断
self.put_data_event(spread)
def put_data_event(self, spread: SpreadData) -> None:
""""""
event = Event(EVENT_SPREAD_DATA, spread)
self.event_engine.put(event)
这里并没有对价差的价格计算是否有效的判断,就直接向价差发送了EVENT_SPREAD_DATA消息,这看引起价差和价差策略通过推送接口on_spread_data()得到错误的价差数据!!!
def calculate_price(self)->bool: # hxxjava change
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return False # hxxjava add
# Calculate price
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.bid_price += leg.bid_price * price_multiplier
self.ask_price += leg.ask_price * price_multiplier
else:
self.bid_price += leg.ask_price * price_multiplier
self.ask_price += leg.bid_price * price_multiplier
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
return True # hxxjava add
class AdvancedSpreadData(SpreadData):
def calculate_price(self)->bool: # hxxjava change
""""""
self.clear_price()
# Go through all legs to calculate price
bid_data = {}
ask_data = {}
volume_inited = False
for variable, leg in self.variable_legs.items():
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return False # hxxjava change
# Generate price dict for calculating spread bid/ask
variable_direction = self.variable_directions[variable]
if variable_direction > 0:
bid_data[variable] = leg.bid_price
ask_data[variable] = leg.ask_price
else:
bid_data[variable] = leg.ask_price
ask_data[variable] = leg.bid_price
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if not trading_multiplier:
continue
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not volume_inited:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
volume_inited = True
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Calculate spread price
self.bid_price = self.parse_formula(self.price_code, bid_data)
self.ask_price = self.parse_formula(self.price_code, ask_data)
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
return True # hxxjava add
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
if spread.calculate_price(): # hxxjava change
self.put_data_event(spread)
如果不做上述改动会,可能会出现策略在开盘的时间,由于价差没有收齐所有腿的tick,
导致价差的 lastest_price等数据为0,可是仍然被推价差数据,进而产生用错误的价差tick。
错误的价差tick会引发错误的价差交易信号,并且以错误的价格进行价差的开仓和平仓!!!
本人在实际的价差策略交易中已经发生过上述的错误!
这个价差策略的大致意思是:
说明:错误的地方我已知注释了。
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData,
TickData,
BarData
)
class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
""""""
author = "用Python的交易员"
boll_window = 20
boll_dev = 2
max_pos = 10
payup = 10
interval = 5
spread_pos = 0.0
boll_up = 0.0
boll_down = 0.0
boll_mid = 0.0
parameters = [
"boll_window",
"boll_dev",
"max_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"boll_up",
"boll_down",
"boll_mid"
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.bg = BarGenerator(self.on_spread_bar)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.on_spread_tick(tick)
def on_spread_tick(self, tick: TickData):
"""
Callback when new spread tick data is generated.
"""
self.bg.update_tick(tick) # 这里有兼容性错误,BarGenerator处理不了最新价为0的tick
def on_spread_bar(self, bar: BarData):
"""
Callback when spread bar data is generated.
"""
self.stop_all_algos()
self.am.update_bar(bar)
if not self.am.inited:
return
self.boll_mid = self.am.sma(self.boll_window)
self.boll_up, self.boll_down = self.am.boll(
self.boll_window, self.boll_dev)
if not self.spread_pos:
if bar.close_price >= self.boll_up:
self.start_short_algo(
bar.close_price - 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif bar.close_price <= self.boll_down:
self.start_long_algo(
bar.close_price + 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif self.spread_pos < 0:
if bar.close_price <= self.boll_mid:
self.start_long_algo(
bar.close_price + 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
else:
if bar.close_price >= self.boll_mid:
self.start_short_algo(
bar.close_price - 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
self.put_event()
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
pass
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass
def stop_open_algos(self):
""""""
if self.buy_algoid:
self.stop_algo(self.buy_algoid)
if self.short_algoid:
self.stop_algo(self.short_algoid)
def stop_close_algos(self):
""""""
if self.sell_algoid: # self.sell_algoid没有定义
self.stop_algo(self.sell_algoid)
if self.cover_algoid: # self.cover_algoid没有定义
self.stop_algo(self.cover_algoid)
下面是BarGenerator的update_tick()函数,错误的地方我已知注释了:
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price: # 这个过滤条件有点想当然了
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
把我标识的错误的过滤条件改成下面的代码,把它注释掉:
# if not tick.last_price: # 这个过滤条件有点想当然了
# return
修改之处见代码中的注释:
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData,
TickData,
BarData
)
class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
""""""
author = "用Python的交易员"
boll_window = 20
boll_dev = 2
max_pos = 10
payup = 10
interval = 5
spread_pos = 0.0
boll_up = 0.0
boll_down = 0.0
boll_mid = 0.0
parameters = [
"boll_window",
"boll_dev",
"max_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"boll_up",
"boll_down",
"boll_mid"
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.bg = BarGenerator(self.on_spread_bar)
self.am = ArrayManager()
self.buy_algoid:str = "" # hxxjava add
self.short_algoid:str = "" # hxxjava add
self.sell_algoid = "" # hxxjava add
self.cover_algoid = "" # hxxjava add
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.on_spread_tick(tick)
def on_spread_tick(self, tick: TickData):
"""
Callback when new spread tick data is generated.
"""
self.bg.update_tick(tick)
def on_spread_bar(self, bar: BarData):
"""
Callback when spread bar data is generated.
"""
self.stop_all_algos()
self.am.update_bar(bar)
if not self.am.inited:
return
self.boll_mid = self.am.sma(self.boll_window)
self.boll_up, self.boll_down = self.am.boll(
self.boll_window, self.boll_dev)
if not self.spread_pos:
if bar.close_price >= self.boll_up:
self.buy_algoid = self.start_short_algo( # hxxjava change
bar.close_price - 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif bar.close_price <= self.boll_down:
self.short_algoid = self.start_long_algo( # hxxjava change
bar.close_price + 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif self.spread_pos < 0:
if bar.close_price <= self.boll_mid:
self.sell_algoid = self.start_long_algo( # hxxjava change
bar.close_price + 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
else:
if bar.close_price >= self.boll_mid:
self.cover_algoid = self.start_short_algo( # hxxjava change
bar.close_price - 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
self.put_event()
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active(): # hxxjava add
if self.buy_algoid == algo.algoid:
self.buy_algoid = ""
elif self.sell_algoid == algo.algoid:
self.sell_algoid = ""
elif self.short_algoid == algo.algoid:
self.short_algoid = ""
else:
self.cover_algoid = ""
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass
def stop_open_algos(self):
""""""
if self.buy_algoid:
self.stop_algo(self.buy_algoid)
if self.short_algoid:
self.stop_algo(self.short_algoid)
def stop_close_algos(self):
""""""
if self.sell_algoid:
self.stop_algo(self.sell_algoid)
if self.cover_algoid:
self.stop_algo(self.cover_algoid)
嗯,原来是有这一层考虑,谢谢回复!
您答非所问了。
我的意思是既然每腿的委托和成交都不推送给价差策略,为什么还要on_order()和on_trade()这两个推送接口?
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
print(f"{self.spread_name} {order}")
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
print(f"{self.spread_name} {trade}")
roger wrote:
- 我只想看到实时的价差 不需要实盘下单
- 交易量改0..01是因为公式中参数是小数,不改小没法输入小数
- 即使不改交易量,输入整数类型公式,依然存在相同问题 ,如图
先研究跨期吧,这种3腿的价差,你的xA+yB+zC,x,y,z的选取决定了交易量v的设置,
如果单腿x,y,z的交易数量必须为整数,要保证xv,yv,zv同时为整数(无所谓正负),
可能你找到的v是你无法承受的下单量,你的资金可能压根就不够开仓价差一次的,搞了半天白玩。
输:
A-B
或者
A*3-B*2
之类的.
另外,你的最小交易量输错了,应该是1,au2110这类单边合约交易不了0.001手
CTA策略的交易标的是一个具体的单边合约。假如我们运行两个CTA策略A和B实例,它们交易的合约都是C。同时运行A和B,那么我们可以发现A和B可以独立地统计各自的持仓,也就是说它们的pos可能是不一样的,不会相互干扰。
而价差交易策略的交易标的是价差。假如我们运行两个价差交易策略SA和SB实例,它们交易的价差都是S1。同时运行SA和SB,SA和SB也应该可以独立地统计各自的持仓,也就是说它们的spread_pos也应该是不一样的,不应该相互干扰。
然而,我们可以发现SA对S1开仓成功后,SB并没有开仓过,可是我们发现SB的spread_pos已经变成和SA的spread_pos相同的数量!
有点迷糊,细想一下,是啊,谁让你交易了相同的价差标的呢?
不行就改,咱们按照价差S1的设置再创建一个价差S2,但是给它取一个不同的名称,区别一下!
接下来吧价差交易策略SB的标的该出S2,再次运行价差交易策略SB。
奇怪的现象发生了:SB并没有开仓过,可是SB的spread_pos仍然变成和SA的spread_pos相同的数量!
查看一下委托单:
其中"来源"一栏中的内容为 “SpreadTrading_价差名称”,就是这里过于简单,导致委托单只关联了价差,而没有关联价差交易策略名称,
所以在价差交易的SpreadTradeEngine引擎无法按照价差策略来推送类似委托单order,成交单trade和价差持仓信息等。
价差交易策略一旦发出委托,调用了SpreadStrategyTemplate的start_long_algo()或者start_short_algo(),而这两个函数最终调用了SpreadStrategyEngine的start_algo()
def start_algo(
self,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool,
offset: Offset
) -> str:
""""""
if not self.trading:
return ""
algoid: str = self.strategy_engine.start_algo(
self,
self.spread_name, # 这里只有价差名称,没有传递策略名称
direction,
offset,
price,
volume,
payup,
interval,
lock
)
self.algoids.add(algoid)
return algoid
这里只讨论原则性问题:
当然,这样的改动是大了些,可是已经存在目前的问题,修改是必须的!
例如下面的DemoStrategy价差策略的代码:
class DemoStrategy(SpreadStrategyTemplate):
"""
利用BOLL通道进行套利的一种价差交易
"""
author = "hxxjava"
bar_window = 5 # K线周期
boll_window = 26 # BOLL参数1
boll_dev = 2 # BOLL参数2
target_pos = 1 # 开仓数量
payup = 10
interval = 5
spread_pos = 0.0
boll_mid = None
boll_up = None
boll_down = None
sk_algoid:str = ""
bp_algoid:str = ""
bk_algoid:str = ""
sp_algoid:str = ""
parameters = [
"bar_window",
"boll_window",
"boll_dev",
"target_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"boll_mid",
"boll_up",
"boll_down",
"sk_algoid",
"bp_algoid",
"bk_algoid",
"sp_algoid"
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.bg = BarGenerator(self.on_spread_bar,self.bar_window,self.on_xmin_spread_bar)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(days=10,callback=self.on_spread_bar)
def on_spread_bar(self,bar:BarData):
"""
Callback when 1 min spread bar data is generated.
"""
print(f"on_spread_bar bar={bar}") # 看看价差策略的bar长的是什么样子
self.bg.update_bar(bar)
。。。其他省略
进去查找,原来问题出在这里:
DemoStrategy调用的load_bar()是从SpreadStrateyTemplate继承的,而SpreadStrateyTemplate是load_bar()又调用了strategy_engine的load_bar()。
strategy_engine的load_bar()的代码如下:
def load_bar(
self, spread: SpreadData, days: int, interval: Interval, callback: Callable
):
""""""
end = datetime.now()
start = end - timedelta(days)
bars = load_bar_data(spread, interval, start, end)
for bar in bars:
callback(bar)
load_bar_data()的代码:
@lru_cache(maxsize=999)
def load_bar_data(
spread: SpreadData,
interval: Interval,
start: datetime,
end: datetime,
pricetick: float = 0
):
""""""
# Load bar data of each spread leg
leg_bars: Dict[str, Dict] = {}
for vt_symbol in spread.legs.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
bar_data: List[BarData] = database_manager.load_bar_data(
symbol, exchange, interval, start, end
)
bars: Dict[datetime, BarData] = {bar.datetime: bar for bar in bar_data}
leg_bars[vt_symbol] = bars
# Calculate spread bar data
spread_bars: List[BarData] = []
for dt in bars.keys():
spread_price = 0
spread_value = 0
spread_available = True
for leg in spread.legs.values():
leg_bar = leg_bars[leg.vt_symbol].get(dt, None)
if leg_bar:
price_multiplier = spread.price_multipliers[leg.vt_symbol]
spread_price += price_multiplier * leg_bar.close_price
spread_value += abs(price_multiplier) * leg_bar.close_price
else:
spread_available = False
if spread_available:
if pricetick:
spread_price = round_to(spread_price, pricetick)
spread_bar = BarData(
symbol=spread.name,
exchange=exchange.LOCAL,
datetime=dt,
interval=interval,
open_price=spread_price,
high_price=spread_price,
low_price=spread_price,
close_price=spread_price,
gateway_name="SPREAD",
)
spread_bar.value = spread_value
spread_bars.append(spread_bar)
return spread_bars
原来load_bar_data()中只考虑了从本地数据库加载1分钟价差K线数据(当然是用价差组合中的多个合约的1分钟K线数据合成的)。
而我因为没有事先下载过rb2109.SHFE和rb2201.SHFE的合约的1分钟K线数据,所以加载不到这10天中的任何1分钟价差K线数据!
就算加载不到1分钟价差K线的原因已经找到,可是这样的设计仍然是有问题的:
鉴于以上的分析,把app\spread_trading\base.py做如下修改:
# hxxjava debug spread_trading
def query_bar_from_rq(
symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.object import HistoryRequest
if not rqdata_client.inited:
rqdata_client.init()
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
@lru_cache(maxsize=999)
def load_bar_data(
spread: SpreadData,
interval: Interval,
start: datetime,
end: datetime,
pricetick: float = 0
):
""""""
# Load bar data of each spread leg
leg_bars: Dict[str, Dict] = {}
for vt_symbol in spread.legs.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
# hxxjava debug spread_trading
bar_data = query_bar_from_rq(
symbol=symbol, exchange=exchange,
interval=interval,start=start,end=end
)
# if bar_data:
# print(f"load {symbol}.{exchange} {interval} bar_data, len of = {len(bar_data)}")
if not bar_data:
bar_data: List[BarData] = database_manager.load_bar_data(
symbol, exchange, interval, start, end
)
bars: Dict[datetime, BarData] = {bar.datetime: bar for bar in bar_data}
leg_bars[vt_symbol] = bars
# Calculate spread bar data
spread_bars: List[BarData] = []
for dt in bars.keys():
spread_price = 0
spread_value = 0
spread_available = True
for leg in spread.legs.values():
leg_bar = leg_bars[leg.vt_symbol].get(dt, None)
if leg_bar:
price_multiplier = spread.price_multipliers[leg.vt_symbol]
spread_price += price_multiplier * leg_bar.close_price
spread_value += abs(price_multiplier) * leg_bar.close_price
else:
spread_available = False
if spread_available:
if pricetick:
spread_price = round_to(spread_price, pricetick)
spread_bar = BarData(
symbol=spread.name,
exchange=exchange.LOCAL,
datetime=dt,
interval=interval,
open_price=spread_price,
high_price=spread_price,
low_price=spread_price,
close_price=spread_price,
gateway_name="SPREAD",
)
spread_bar.value = spread_value
spread_bars.append(spread_bar)
return spread_bars
看看 load_bar()从本地数据库加载的1分钟价差K线数据,如下所示:
你会发现其的成交量,volume=0,在使用过程从必须加以注意!
也就是说,需要成交量参与计算的指标是不可利用价差K线序列来计算的!
bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 15, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=109.0, high_price=109.0, low_price=109.0, close_price=109.0)
bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 16, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=108.0, high_price=108.0, low_price=108.0, close_price=108.0)
bar=BarData(gateway_name='SPREAD', symbol='rb2110&rb2201', exchange=<Exchange.LOCAL: 'LOCAL'>, datetime=datetime.datetime(2021, 6, 17, 11, 17, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=0, open_interest=0, open_price=106.0, high_price=106.0, low_price=106.0, close_price=106.0)
建议在图中:
另外图中的⑧是创建错误的价差组合,软件运行删除,可是删除了之后还必须重新启动了之后才会消失,是否可以考虑直接有软件清除释放该价差组合?
这个组合策略实质上是价差交易。
我理解其本意是选择两个相关的合约进行价差交易,它有两个腿:leg1和leg2,将其leg1和leg2按照leg1_ratio和leg2_ratio设定的比例进行配比做出价差K线。
该组合的开仓信号为:
价差K线突破BOLL上轨或者上轨时,卖出leg1,买入leg2
价差K线突破BOLL上轨或者下轨时,买入leg1,卖出leg2
该组合的平仓信号为:
leg1有持多仓并且价差K线在BOLL中轨之上时,分别平仓leg1和leg2
leg1有持空仓并且价差K线在BOLL中轨之下时,分别平仓leg1和leg2
当leg1_ratio和leg2_ratio相等的时候,一起都没有毛病。
当leg1_ratio和leg2_ratio不相等的时候,就有问题了。你会发现它们开仓的数量总是一样多,这是不对的。
比如leg1为i2109.DCE, leg2为rb2110.DCE。
我们知道i2109的合约乘数是100,atr=2.7,而rb2109的合约乘数是10,atr=7.7,
leg2_ratio/leg1_ratio =(2.7100)/(7.710) ≈ 7/2
那么我们应该配比leg1_ratio=2和leg2_ratio=7,这意味着i2109.DCE开仓2手,就需要反向开仓rb2110.DCE 开仓7手。
因为价差的计算是这样的:
self.current_spread = (
leg1_bar.close_price * self.leg1_ratio - leg2_bar.close_price * self.leg2_ratio
)
而实际开仓却是(1手,-1手)或(-1手,1手)的组合,与价差计算不符合。
代码如下,修改部分见注释:
from typing import List, Dict
from datetime import datetime
import numpy as np
from vnpy.app.portfolio_strategy import StrategyTemplate, StrategyEngine
from vnpy.trader.utility import BarGenerator
from vnpy.trader.object import TickData, BarData
class PairTradingStrategy(StrategyTemplate):
""""""
author = "用Python的交易员"
price_add = 5
boll_window = 20
boll_dev = 2
# fixed_size = 1 # 没有使用,去掉
leg1_ratio = 1
leg2_ratio = 1
leg1_symbol = ""
leg2_symbol = ""
current_spread = 0.0
boll_mid = 0.0
boll_down = 0.0
boll_up = 0.0
parameters = [
"price_add",
"boll_window",
"boll_dev",
# "fixed_size", # 没有使用,去掉
"leg1_ratio",
"leg2_ratio",
]
variables = [
"leg1_symbol",
"leg2_symbol",
"current_spread",
"boll_mid",
"boll_down",
"boll_up",
]
def __init__(
self,
strategy_engine: StrategyEngine,
strategy_name: str,
vt_symbols: List[str],
setting: dict
):
""""""
super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
self.bgs: Dict[str, BarGenerator] = {}
self.targets: Dict[str, int] = {}
self.last_tick_time: datetime = None
self.spread_count: int = 0
self.spread_data: np.array = np.zeros(100)
# Obtain contract info
self.leg1_symbol, self.leg2_symbol = vt_symbols
def on_bar(bar: BarData):
""""""
pass
for vt_symbol in self.vt_symbols:
self.targets[vt_symbol] = 0
self.bgs[vt_symbol] = BarGenerator(on_bar)
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bars(1)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
if (
self.last_tick_time
and self.last_tick_time.minute != tick.datetime.minute
):
bars = {}
for vt_symbol, bg in self.bgs.items():
bars[vt_symbol] = bg.generate()
self.on_bars(bars)
bg: BarGenerator = self.bgs[tick.vt_symbol]
bg.update_tick(tick)
self.last_tick_time = tick.datetime
def on_bars(self, bars: Dict[str, BarData]):
""""""
self.cancel_all()
# Return if one leg data is missing
if self.leg1_symbol not in bars or self.leg2_symbol not in bars:
return
# Calculate current spread
leg1_bar = bars[self.leg1_symbol]
leg2_bar = bars[self.leg2_symbol]
# Filter time only run every 5 minutes
if (leg1_bar.datetime.minute + 1) % 5:
return
self.current_spread = (
leg1_bar.close_price * self.leg1_ratio - leg2_bar.close_price * self.leg2_ratio
)
# Update to spread array
self.spread_data[:-1] = self.spread_data[1:]
self.spread_data[-1] = self.current_spread
self.spread_count += 1
if self.spread_count <= self.boll_window:
return
# Calculate boll value
buf: np.array = self.spread_data[-self.boll_window:]
std = buf.std()
self.boll_mid = buf.mean()
self.boll_up = self.boll_mid + self.boll_dev * std
self.boll_down = self.boll_mid - self.boll_dev * std
# Calculate new target position
leg1_pos = self.get_pos(self.leg1_symbol)
if not leg1_pos:
if self.current_spread >= self.boll_up:
self.targets[self.leg1_symbol] = -1*self.leg1_ratio # hxxjava add *self.leg1_ratio
self.targets[self.leg2_symbol] = 1*self.leg2_ratio # hxxjava add *self.leg2_ratio
elif self.current_spread <= self.boll_down:
self.targets[self.leg1_symbol] = 1*self.leg1_ratio # hxxjava add *self.leg1_ratio
self.targets[self.leg2_symbol] = -1*self.leg2_ratio # hxxjava add *self.leg2_ratio
elif leg1_pos > 0:
if self.current_spread >= self.boll_mid:
self.targets[self.leg1_symbol] = 0
self.targets[self.leg2_symbol] = 0
else:
if self.current_spread <= self.boll_mid:
self.targets[self.leg1_symbol] = 0
self.targets[self.leg2_symbol] = 0
# Execute orders
for vt_symbol in self.vt_symbols:
target_pos = self.targets[vt_symbol]
current_pos = self.get_pos(vt_symbol)
pos_diff = target_pos - current_pos
volume = abs(pos_diff)
bar = bars[vt_symbol]
if pos_diff > 0:
price = bar.close_price + self.price_add
if current_pos < 0:
self.cover(vt_symbol, price, volume)
else:
self.buy(vt_symbol, price, volume)
elif pos_diff < 0:
price = bar.close_price - self.price_add
if current_pos > 0:
self.sell(vt_symbol, price, volume)
else:
self.short(vt_symbol, price, volume)
self.put_event()
当leg1与leg2正相关时,leg1_ratio和leg2_ratio同为正整数;
当leg1与leg2负相关时,leg1_ratio为正整数和leg2_ratio同为负整数。
战略性聊天 wrote:
如题请问如何调用呢?谢谢
class NewArrayManager(ArrayManager):
def __init__(self, size: int = 100):
""""""
super().__init__(size)
def VWAP(self, n:int = 20, array: bool = True) -> Union[Tuple[float, float, float], np.ndarray]:
"""
定义VWAP指标
"""
close = self.close
vol = self.volume
vwap, vwap10, vwap20 = bnlib.VWAP(close, vol, n, array)
if array:
return vwap, vwap10, vwap20
else:
return vwap[-1], vwap10[-1], vwap20[-1]
self.am = NewArrayMannager()
vwap, vwap10, vwap20 = self.am.VWAP(n=20)
那么 vwap, vwap10, vwap20 就分别表示20周期的vwap,vwap10表示10周期的vwap平均值,vwap20表示20周期的vwap平均值。
OnRtnInstrumentStatus
合约交易状态通知,主动推送。公有流回报。
各交易所的合约状态变化详见合约状态变化说明。
◇ 1.函数原型virtual void OnRtnInstrumentStatus(CThostFtdcInstrumentStatusField *pInstrumentStatus) {};
struct CThostFtdcInstrumentStatusField
{
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///保留的无效字段
TThostFtdcOldExchangeInstIDType reserve1;
///结算组代码
TThostFtdcSettlementGroupIDType SettlementGroupID;
///保留的无效字段
TThostFtdcOldInstrumentIDType reserve2;
///合约交易状态
TThostFtdcInstrumentStatusType InstrumentStatus;
///交易阶段编号
TThostFtdcTradingSegmentSNType TradingSegmentSN;
///进入本状态时间
TThostFtdcTimeType EnterTime;
///进入本状态原因
TThostFtdcInstStatusEnterReasonType EnterReason;
///合约在交易所的代码
TThostFtdcExchangeInstIDType ExchangeInstID;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
};
EnterTime:只有郑商所的时间戳是CTP的本地时间,其他交易所的是交易所时间
class CtaTemplate(ABC):
""""""
author = ""
parameters = []
variables = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
""""""
self.cta_engine = cta_engine
self.strategy_name = strategy_name
self.vt_symbol = vt_symbol
self.inited = False
self.trading = False # 策略是否处于交易状态
self.pos = 0
... ... 其他略去
你会发现self.trading除了初始化时赋值为False之外,并没有在CtaTemplate的任何其他地方被修改赋值。
我们的CTA策略都是从CtaTemplate模版扩展而来。当我们的CTA策略被实例化为运行策略时,self.trading就被缓冲到以CTA策略实例名称为文件名的json文件中。之后随着CTA策略被初始化、启动、停止,self.trading的状态在False和True之间做相应的变化,并调用put_event()函数写入json文件,调用sync_data()函数从json文件中读出。这两个函数中都是调用了self.cta_engine的功能。
def put_event(self):
"""
Put an strategy data event for ui update.
"""
if self.inited:
self.cta_engine.put_strategy_event(self)
def sync_data(self):
"""
Sync strategy variables value into disk storage.
"""
if self.trading:
self.cta_engine.sync_strategy_data(self)
def put_strategy_event(self, strategy: CtaTemplate):
"""
Put an event to update strategy status.
"""
data = strategy.get_data() # 策略的变量,包含的trading
event = Event(EVENT_CTA_STRATEGY, data)
self.event_engine.put(event) # 发送消息EVENT_CTA_STRATEGY给订阅者,通知策略变量变化了
def sync_strategy_data(self, strategy: CtaTemplate):
"""
Sync strategy data into json file.
"""
data = strategy.get_variables()
# 下面的两句把inited和trading都剔除了
data.pop("inited") # Strategy status (inited, trading) should not be synced.
data.pop("trading")
self.strategy_data[strategy.strategy_name] = data
save_json(self.data_filename, self.strategy_data) # 把其他策略变量写入json文件
由上面的代码分析发现,策略的trading并没有考虑交易合约是否可以处在交易状态。
策略的trading是会影响到策略的委托行为的,下面是CtaTemplate的send_order()函数:
def send_order(
self,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
):
"""
Send a new order.
"""
if self.trading: # 只要self.trading==True就可以委托
vt_orderids = self.cta_engine.send_order(
self, direction, offset, price, volume, stop, lock, net
)
return vt_orderids
else:
return []
实际使用中会发现策略可能会在休市或者非交易时间,因为收到的错误数据而误动作,导致出现委托无法得到回应或者是拒单。
其原因也是因为作为委托条件的self.trading并没有考虑委托时刻,交易合约是否可以处在交易状态!
合约品种交易状态推送接口函数是onRtnInstrumentStatus()。
具体方法我已经在再谈集合竞价 里详细地讨论过了,文章很长,希望有耐心看完,这里就不再赘述。
注:其他网关接口也应该类似。
/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'
typedef char TThostFtdcInstrumentStatusType;
只有在交易合约在“连续交易”和 “集合竞价报单”这两种状态下,同时交易者也启动了策略(trading==True)情况下,策略才可以执行委托。
也就是说 :委托执行条件 = 合约交易状态 + self.trading。
待续...
战略性聊天 wrote:
self.fast_ma0=fast_ma[-1]
请问-1是指最新的不断变化价格的均线还是当前时间点上一根的均线,谢谢
是根据已经完成的K线序列计算而得到的均线序列的最后一个,不是正在变化的临时K线不参与计算的。
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
tick_time = tick.datetime.time()
time1 = time(20,55,0)
time2 = time(21,0,1)
if time1 <= tick_time <= time2:
print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")
if ("IF" in tick.vt_symbol) and ("CFFEX" in tick.vt_symbol):
time1 = time(9,25,0)
time2 = time(9,30,1)
if time1 <= tick_time <= time2:
print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")
super().on_tick(tick)
【集合竞价tick数据
tick_time=09:29:00.500000
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 29, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300指数2107',
volume=7,
open_interest=22276.0,
last_price=5183.8,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5183.8,
pre_close=5184.0,
bid_price_1=5180.2,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5184.0,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=4,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=3,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
tick_time=09:30:00.500000
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 30, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300指数2107',
volume=10,
open_interest=22274.0,
last_price=5180.2,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5180.2,
pre_close=5184.0,
bid_price_1=5180.2,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5180.4,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=2,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=1,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
tick_time=09:30:01
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 30, 1, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300 指数2107',
volume=13,
open_interest=22272.0,
last_price=5180.2,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5180.2,
pre_close=5184.0,
bid_price_1=5177.4,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5179.2,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=5,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=1,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
以白云机场为例,它的集合竞价是每个交易日的14:56-14:59,但是它这三分钟却只形成1根1分钟K线!
下面是白云机场的1分钟K线图:
由上图可知:
深交所的股票在集合竞价阶段1分钟K线就是3分钟时长,其它时段的又是1分钟;
上交所的股票在集合竞价阶段是没有1分钟K线的,它发生在9:25-9:29,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货日盘合约的集合竞价阶段发生在上午开市前5分钟的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货夜盘合约的集合竞价阶段发生在前一交易日的夜间20:55-21:00的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
这个问题我之前已经有文章讨论过了,需要修改是肯定的!
位于vnpy_ctp\api\include\ctp\ThostFtdcUserApiDataType.h
/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'
typedef char TThostFtdcInstrumentStatusType;
/////////////////////////////////////////////////////////////////////////
///TFtdcInstStatusEnterReasonType是一个品种进入交易状态原因类型
/////////////////////////////////////////////////////////////////////////
///自动切换
#define THOST_FTDC_IER_Automatic '1'
///手动切换
#define THOST_FTDC_IER_Manual '2'
///熔断
#define THOST_FTDC_IER_Fuse '3'
typedef char TThostFtdcInstStatusEnterReasonType;
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
@dataclass
class StatusData(BaseData):
"""
合约状态 hxxjava debug
"""
# 合约代码
symbol:str
# 交易所代码
exchange : Exchange
# 结算组代码
settlement_group_id : str = ""
# 合约交易状态
instrument_status : InstrumentStatus = None
# 交易阶段编号
trading_segment_sn : int = None
# 进入本状态时间
enter_time : str = ""
# 进入本状态原因
enter_reason : str = ""
# 合约在交易所的代码
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
EVENT_STATUS = "eStatus" # hxxjava debug
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约状态信息 # hxxjava debug
"""
if data:
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = data["InstrumentStatus"],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = data["EnterReason"],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status = {status}")
self.gateway.on_status(status)
status=StatusData(gateway_name='CTP', symbol='nr', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='nr')
status=StatusData(gateway_name='CTP', symbol='lu', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='lu')
status=StatusData(gateway_name='CTP', symbol='sc', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='sc')
... ... 相同略去 内容太多,包括了这个CTP接口中所有合约品种的合约状态信息,其他省略了
合约状态信息是有交易服务器推送到客户端的,其中包含如下的合约状态:
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
并且还有进入的时间和原因,这些信息正是我们解决BarGenerator在处理集合竞价时段的tick数据错误问题所需要的!
在vnpy\trader\engine.py文件中,为OmsEngine做如下的修改:
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.statuses: Dict[str, StatusData] = {} # hxxjava debug
self.active_orders: Dict[str, OrderData] = {}
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_statuses = self.get_all_statuses # hxxjava debug
self.main_engine.get_all_active_orders = self.get_all_active_orders
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
... ... 相同略去
def process_status_event(self, event: Event) -> None: # hxxjava debug
""""""
status = event.data
# print(f"got a status = {status}")
self.statuses[status.vt_symbol] = status
... ... 相同略去
def get_all_statuses(self) -> List[StatusData]: # hxxjava debug
"""
Get all status data.
"""
return list(self.statuses.values())
... ... 相同略去
注意:
这个步骤的目的:
把CTP接口接收到的所有合约品种的状态信息保存到self.statuses字典中。
为系统的主引擎main_engine提供访问所有合约品种的状态信息函数get_all_statuses()
修改app\cta_strategy\engine.py文件中的CtaEngine,下面只给出主要的代码修改部分:
class CtaEngine(BaseEngine):
""""""
... ... 相同略去
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event) # hxxjava debug
... ... 相同略去
def process_status_event(self,event:Event): # hxxjava debug
""" 分发合约某种状态信息到相应的策略 """
status:StatusData = event.data
for vt_symbol in self.symbol_strategy_map.keys():
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
if (status.vt_symbol.upper() in [symbol.upper(),instrument.upper()]) and (status.exchange == exchange):
# 分发合约某种状态信息到相应的所有策略中
strategies = self.symbol_strategy_map[vt_symbol]
for strategy in strategies:
self.call_strategy_func(strategy, strategy.on_status, status)
... ... 相同略去
在app\cta_strategy\template.py文件中,为cta_template模版增加下面的接口:
@virtual
def on_status(self, status: StatusData): # hxxjava debug
"""
Callback of status data
"""
pass
简单举例如下:
def on_status(self, status: StatusData): # hxxjava debug
print(f"strategy {self.strategy_name} got a status event {status}")
到这里,您可以让你的CTA策略在感知到策略正在交易的合约交易状态变化了。
具体如何使用合约交易状态信息, 这么做取决于您的需求了。举例如下:
在这个文件里:
vnpy_ctp\api\include\ctp\ThostFtdcUserApiStruct.h
定义如下:
///深度行情
struct CThostFtdcDepthMarketDataField
{
///交易日
TThostFtdcDateType TradingDay;
///保留的无效字段
TThostFtdcOldInstrumentIDType reserve1;
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///保留的无效字段
TThostFtdcOldExchangeInstIDType reserve2;
///最新价
TThostFtdcPriceType LastPrice;
///上次结算价
TThostFtdcPriceType PreSettlementPrice;
///昨收盘
TThostFtdcPriceType PreClosePrice;
///昨持仓量
TThostFtdcLargeVolumeType PreOpenInterest;
///今开盘
TThostFtdcPriceType OpenPrice;
///最高价
TThostFtdcPriceType HighestPrice;
///最低价
TThostFtdcPriceType LowestPrice;
///数量
TThostFtdcVolumeType Volume;
///成交金额
TThostFtdcMoneyType Turnover;
///持仓量
TThostFtdcLargeVolumeType OpenInterest;
///今收盘
TThostFtdcPriceType ClosePrice;
///本次结算价
TThostFtdcPriceType SettlementPrice;
///涨停板价
TThostFtdcPriceType UpperLimitPrice;
///跌停板价
TThostFtdcPriceType LowerLimitPrice;
///昨虚实度
TThostFtdcRatioType PreDelta;
///今虚实度
TThostFtdcRatioType CurrDelta;
///最后修改时间
TThostFtdcTimeType UpdateTime;
///最后修改毫秒
TThostFtdcMillisecType UpdateMillisec;
///申买价一
TThostFtdcPriceType BidPrice1;
///申买量一
TThostFtdcVolumeType BidVolume1;
///申卖价一
TThostFtdcPriceType AskPrice1;
///申卖量一
TThostFtdcVolumeType AskVolume1;
///申买价二
TThostFtdcPriceType BidPrice2;
///申买量二
TThostFtdcVolumeType BidVolume2;
///申卖价二
TThostFtdcPriceType AskPrice2;
///申卖量二
TThostFtdcVolumeType AskVolume2;
///申买价三
TThostFtdcPriceType BidPrice3;
///申买量三
TThostFtdcVolumeType BidVolume3;
///申卖价三
TThostFtdcPriceType AskPrice3;
///申卖量三
TThostFtdcVolumeType AskVolume3;
///申买价四
TThostFtdcPriceType BidPrice4;
///申买量四
TThostFtdcVolumeType BidVolume4;
///申卖价四
TThostFtdcPriceType AskPrice4;
///申卖量四
TThostFtdcVolumeType AskVolume4;
///申买价五
TThostFtdcPriceType BidPrice5;
///申买量五
TThostFtdcVolumeType BidVolume5;
///申卖价五
TThostFtdcPriceType AskPrice5;
///申卖量五
TThostFtdcVolumeType AskVolume5;
///当日均价
TThostFtdcPriceType AveragePrice;
///业务日期
TThostFtdcDateType ActionDay;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///合约在交易所的代码
TThostFtdcExchangeInstIDType ExchangeInstID;
};
linhertz wrote:
请问在币安也有同样的问题吗?是不是可以把 Int去掉就解决了这个问题。不过去掉Int之后,那么其他的策略,例如股指期货策略是不是会受影响?对于新手来讲怎么解决最好?
谢谢
不可以。因为去掉int虽然可以在PaperAccount模拟时能够下单,但是一旦实盘交易是无法开仓比合约资料中规定的最小手数的。