def MinuteGap(StartTime: time, EndTime: time) -> int:
def nMinuteLater(point: time, m: int) -> time:
两个函数中 2460中间的乘号丢失,应是 24 * 60
"""
period.py
除年、季外,支持月、周、日、分钟的任意周期K线的合成
本模块位于vnpy\trader\目录下,并且要求在同目录下object.py中的TickData、BarData中增加成员 tradingday: datetime
"""
from datetime import time, datetime, timedelta
from operator import le
from vnpy.trader.object import BarData, TickData
from enum import Enum
"""
字典key是期货品种的全小写代码,value是一个元组,分别表示:区分大小写的期货品种代码,品种中文名称,所属交易所,倍率,单位跳价,交易时段编号
其中的交易时段编号,为G_TRADESEG元组的元素编号,编号从0开始
"""
G_PRODUCT: dict = {
"hw": ("HW", "强麦", "CZCE", 20, 1, 0),
"pm": ("PM", "普麦", "CZCE", 50, 1, 0),
"cf": ("CF", "棉花", "CZCE", 5, 5, 1),
"cy": ("CY", "棉纱", "CZCE", 5, 5, 1),
"sr": ("SR", "白糖", "CZCE", 10, 1, 1),
"ta": ("TA", "PTA", "CZCE", 5, 2, 1),
"oi": ("OI", "菜籽油", "CZCE", 10, 2, 1),
"ri": ("RI", "早籼稻", "CZCE", 20, 1, 0),
"ma": ("MA", "甲醇", "CZCE", 10, 1, 1),
"fg": ("FG", "玻璃", "CZCE", 20, 1, 1),
"rs": ("RS", "油菜籽", "CZCE", 10, 1, 0),
"rm": ("RM", "菜粕", "CZCE", 10, 1, 1),
"zc": ("ZC", "动力煤", "CZCE", 100, 0.2, 1),
"jr": ("JR", "粳稻", "CZCE", 20, 1, 0),
"lr": ("LR", "晚籼稻", "CZCE", 20, 1, 0),
"sf": ("SF", "硅铁", "CZCE", 5, 2, 0),
"sm": ("SM", "锰硅", "CZCE", 5, 2, 0),
"ap": ("AP", "苹果", "CZCE", 10, 1, 0),
"cj": ("CJ", "红枣", "CZCE", 5, 5, 0),
"pk": ("PK", "花生", "CZCE", 5, 2, 0),
"ur": ("UR", "尿素", "CZCE", 20, 1, 0),
"sa": ("SA", "纯碱", "CZCE", 20, 1, 1),
"c": ("c", "玉米", "DCE", 10, 1, 1),
"cs": ("cs", "淀粉", "DCE", 10, 1, 1),
"a": ("a", "大豆一号", "DCE", 10, 1, 1),
"b": ("b", "大豆二号", "DCE", 10, 1, 1),
"m": ("m", "豆粕", "DCE", 10, 1, 1),
"y": ("y", "豆油", "DCE", 10, 2, 1),
"p": ("p", "棕榈油", "DCE", 10, 2, 1),
"lh": ("lh", "生猪", "DCE", 16, 5, 0),
"jd": ("jd", "鸡蛋", "DCE", 10, 1, 0),
"bb": ("bb", "胶合板", "DCE", 500, 0.05, 0),
"fb": ("fb", "纤维板", "DCE", 500, 0.05, 0),
"l": ("l", "塑料", "DCE", 5, 1, 1),
"v": ("v", "PVC", "DCE", 5, 1, 1),
"pp": ("pp", "聚丙烯", "DCE", 5, 1, 1),
"eg": ("eg", "乙二醇", "DCE", 10, 1, 1),
"eb": ("eb", "苯乙烯", "DCE", 5, 1, 1),
"pg": ("pg", "LPG", "DCE", 20, 1, 1),
"j": ("j", "焦炭", "DCE", 100, 0.5, 1),
"jm": ("jm", "焦煤", "DCE", 60, 0.5, 1),
"i": ("i", "铁矿石", "DCE", 100, 0.5, 1),
"rr": ("rr", "粳米", "DCE", 10, 1, 1),
"cu": ("cu", "铜", "SHFE", 5, 10, 3),
"al": ("al", "铝", "SHFE", 5, 5, 3),
"zn": ("zn", "锌", "SHFE", 5, 5, 3),
"pb": ("pb", "铅", "SHFE", 5, 5, 3),
"ni": ("ni", "镍", "SHFE", 1, 10, 3),
"sn": ("sn", "锡", "SHFE", 1, 10, 3),
"ss": ("ss", "不锈钢", "SHFE", 5, 5, 3),
"au": ("au", "黄金", "SHFE", 1000, 0.05, 4),
"ag": ("ag", "白银", "SHFE", 15, 1, 4),
"rb": ("rb", "螺纹钢", "SHFE", 10, 1, 1),
"wr": ("wr", "线材", "SHFE", 10, 1, 0),
"hc": ("hc", "热卷", "SHFE", 10, 1, 1),
"fu": ("fu", "燃油", "SHFE", 10, 1, 1),
"bu": ("bu", "沥青", "SHFE", 10, 2, 1),
"ru": ("ru", "橡胶", "SHFE", 10, 5, 1),
"sp": ("sp", "纸浆", "SHFE", 10, 2, 1),
"if": ("IF", "沪深300", "CFFEX", 300, 0.2, 5),
"ih": ("IH", "上证50", "CFFEX", 300, 0.2, 5),
"ic": ("IC", "中证500", "CFFEX", 200, 0.2, 5),
"ts": ("TS", "二债", "CFFEX", 10000, 0.005, 6),
"tf": ("TF", "五年国债", "CFFEX", 10000, 0.005, 6),
"t": ("T", "十年国债", "CFFEX", 10000, 0.005, 6),
"sc": ("sc", "原油", "INE", 1000, 0.1, 4),
"lu": ("lu", "低硫燃油", "INE", 10, 1, 1),
"nr": ("nr", "20号胶", "INE", 10, 5, 1)
}
"""
交易时段
"""
G_TRADESEG: tuple = (((time(9, 0), time(10, 15)), (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
# 期货日盘交易时段
((time(21, 0), time(23, 0)), (time(9, 0), time(10, 15)),
(time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
# 期货夜盘交易时段
((time(21, 0), time(23, 30)), (time(9, 0), time(10, 15)),
(time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
# 期货夜盘交易时段,曾经使用过,目前国内已经没有交易所采用
((time(21, 0), time(1, 0)), (time(9, 0), time(10, 15)),
(time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
# 有色金属交易时段
((time(21, 0), time(2, 30)), (time(9, 0), time(10, 15)),
(time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
# 贵金属交易时段
((time(9, 30), time(11, 30)), (time(13, 0), time(15, 0))),
# 股票、股指交易时段
((time(9, 15), time(11, 30)), (time(13, 0), time(15, 15))),
# 国债期货交易时段
((time(0, 0), time(0, 0)),)
# 24小时连续交易,目前国内没有
)
def MinuteGap(StartTime: time, EndTime: time) -> int:
"""计算两个时间相隔的分钟数,如果开始时间大于结束时间,则表示隔夜了,如果开始时间等于结束时间则表示相差一整天"""
tmp: int = EndTime.hour
gap: int = 0
if EndTime < StartTime:
tmp = EndTime.hour + 24
if EndTime == StartTime:
gap = 2460
else:
gap = (tmp60+EndTime.minute) - (StartTime.hour*60+StartTime.minute)
return gap
def nMinuteLater(point: time, m: int) -> time:
"""从给定的时间point开始之后的m分钟,取值必须为正整数,返回time。如果超过24:00则自动接续到凌晨"""
totalmin: int = point.hour60+point.minute
m = m % (2460)
totalmin += m
if totalmin >= 2460:
totalmin -= 2460
return time(int(totalmin/60), totalmin % 60)
def GetTimeGapSerial(tradeseg: tuple, gap: int) -> list:
"""给定交易时段以及K线的分钟周期,返回该周期下的时间分段序列。只适用自定义分钟周期
tradeseg为G_TRADESEG中的元素,其中gap为分钟周期,因此不得大于等于全天交易的总分钟时长
如GetTimeGapSerial(G_TRADESEG[1], 60)则返回如下的60分钟K线的分段时间序列:
[21:00:00 , 22:00:00],
[22:00:00 , 23:00:00],
[09:00:00 , 10:00:00],
[10:00:00 , 11:15:00],
[11:15:00 , 14:15:00],
[14:15:00 , 15:00:00]]
"""
totalminute: int = 0
tgs: list = []
for t in tradeseg:
totalminute += MinuteGap(t[0], t[1]) # 计算交易总时长分钟数
if gap >= totalminute:
return tgs # K线分钟周期大于等于当日交易总时长,直接返回
gapremain: int = gap
for t in tradeseg:
subseg: int = MinuteGap(t[0], t[1])
if gapremain == gap:
tgs.append([t[0], t[1]])
if gapremain > subseg:
gapremain -= subseg
tgs[-1][1] = t[1]
elif gapremain == subseg:
gapremain = gap
tgs[-1][1] = t[1]
else:
postime: time = nMinuteLater(t[0], gapremain)
tgs[-1][1] = postime
tgs.append([postime, t[1]])
subsegremain: int = subseg-gapremain
gapremain = gap
while subsegremain > 0:
if gapremain <= subsegremain:
postime = nMinuteLater(tgs[-1][0], gapremain)
tgs[-1][1] = postime
if gapremain < subsegremain:
tgs.append([postime, t[1]])
subsegremain -= gapremain
gapremain = gap
else:
gapremain -= subsegremain
subsegremain = 0
return tgs
def isSameWeek(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一周内"""
return (d1-timedelta(d1.weekday())).date() == (d2-timedelta(d2.weekday())).date()
def isSameMonth(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一个月内"""
return (d1.year == d2.year) and (d1.month == d2.month)
def isInSameTimeSeg(timeseg: list, dt: datetime, tradeseg: tuple = ()) -> bool:
"""
判断给定的时间dt是否属于timeseg时段内
任意分钟K线的分段,可能会隔夜横跨两个日期,因此判断一个时间是否属于当前K线时间段内需要专门处理
"""
ret: bool = False
isrightborder: bool = False
for seg in tradeseg:
if seg[1] == dt.time():
isrightborder = True # 为了将各交易时间分段的最后一个tick数据正确归入K线
break
if timeseg[1] > timeseg[0]:
if isrightborder:
if dt.time() >= timeseg[0] and dt.time() <= timeseg[1]:
ret = True
else:
if dt.time() >= timeseg[0] and dt.time() < timeseg[1]:
ret = True
else:
if isrightborder:
if (dt.time() >= timeseg[0] and dt.time() >= timeseg[1]) or\
(dt.time() <= timeseg[0] and dt.time() <= timeseg[1]):
ret = True
else:
if (dt.time() >= timeseg[0] and dt.time() > timeseg[1]) or\
(dt.time() <= timeseg[0] and dt.time() < timeseg[1]):
ret = True
return ret
def GetSeg(tradesplitseg: list, dt: datetime) -> list:
"""给定时间分段序列以及时间点,返回该时间点所属的时段,用于自定义分钟周期的当前时段确定"""
for timeseg in tradesplitseg:
if isInSameTimeSeg(timeseg, dt):
return timeseg
return []
def isSameSeason(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一季节内"""
rt: bool = False
if d1.year == d2.year:
if d1.month >= 1 and d1.month <= 3 and d2.month >= 1 and d2.month <= 3:
rt = True
elif d1.month >= 4 and d1.month <= 6 and d2.month >= 4 and d2.month <= 6:
rt = True
elif d1.month >= 7 and d1.month <= 9 and d2.month >= 7 and d2.month <= 9:
rt = True
elif d1.month >= 10 and d1.month <= 12 and d2.month >= 10 and d2.month <= 12:
rt = True
return rt
class Level(Enum):
"""
K线合并的周期级别:年、季、月、周、日、分钟
"""
YEAR = "year"
SEASON = "season"
MONTH = "month"
WEEK = "week"
DAY = "day"
MINUTE = "minute"
class MergedBarData:
level: Level
num: int
count: int = 1
bar: BarData
startdatetime: datetime
enddatetime: datetime
class MergeBar():
"""K线合并"""
def __init__(self, vt_product: str, level: Level, num: int = 1) -> None:
"""
参数vt_product为期货品种代码
参数level为要合并的K线的周期基数,参数num为level基数下周期数量,
如level=Level.MINUTE, num=12,表示合成12分钟线
如level=Level.MINUTE, num=60,表示合成60分钟线,即1小时线
如level=Level.DAY, num=10,表示合成10日线
其中 月、周、日、分钟支持任意数量的K线合并;年、季则只支持num=1的K线合并
"""
self.vt_product: str = vt_product.lower()
self.productinfo: tuple = ()
self.tradeseg: tuple = ()
self.tradesplitseg: list = []
self.bars: list(MergedBarData) = []
self.level: Level = level
self.num: int = num
if self.vt_product in G_PRODUCT:
self.productinfo = G_PRODUCT[self.vt_product]
self.tradeseg = G_TRADESEG[self.productinfo[5]]
self.tradesplitseg = GetTimeGapSerial(self.tradeseg, self.num)
else:
self.tradesplitseg = GetTimeGapSerial(
G_TRADESEG[1], self.num) # 默认夜盘
def MergeFromTick(self, tick: TickData):
"""实盘行情推送时调用,或者有tick历史数据的回测也可以调用"""
bar: BarData = BarData()
bar.gateway_name = tick.gateway_name
bar.extra = tick.extra
bar.symbol = tick.symbol
bar.exchange = tick.exchange
bar.datetime = tick.datetime
bar.tradingday = tick.tradingday
# bar.interval
bar.volume = tick.volume
bar.turnover = tick.turnover
bar.open_interest = tick.open_interest
bar.open_price = tick.last_price
bar.high_price = tick.last_price
bar.low_price = tick.last_price
bar.close_price = tick.last_price
self.MergeFromBar(bar)
def MergeFromBar(self, bar: BarData):
"""没有tick数据时调用,如历史数据回测,K线必须是1分钟的K线,否则无法合成任意分钟的K线"""
if self.level == Level.YEAR:
self._MergeYear(bar)
elif self.level == Level.SEASON:
self._MergeSeason(bar)
elif self.level == Level.MONTH:
self._MergeMonth(bar)
elif self.level == Level.WEEK:
self._MergeWeek(bar)
elif self.level == Level.DAY:
self._MergeDay(bar)
elif self.level == Level.MINUTE:
self._MergeMinute(bar)
def MergeFromDayBar(self, bar: BarData):
if self.level == Level.YEAR:
self._MergeYear(bar)
elif self.level == Level.SEASON:
self._MergeSeason(bar)
elif self.level == Level.MONTH:
self._MergeMonth(bar)
elif self.level == Level.WEEK:
self._MergeWeek(bar)
elif self.level == Level.DAY:
self._MergeDay(bar)
def _SuckIn(self, suckbar: BarData, foodbar: BarData) -> BarData:
suckbar.high_price = max(suckbar.high_price, foodbar.high_price)
suckbar.low_price = min(suckbar.low_price, foodbar.low_price)
suckbar.close_price = foodbar.close_price
suckbar.tradingday = foodbar.tradingday
suckbar.open_interest = foodbar.open_interest
suckbar.turnover += foodbar.turnover
suckbar.volume += foodbar.volume
return suckbar
def _SuckNew(self, foodbar: BarData) -> MergedBarData:
mergebar = MergedBarData()
mergebar.level = self.level
mergebar.num = self.num
mergebar.count = 1
mergebar.bar = foodbar
return mergebar
def _MergeYear(self, bar: BarData):
"""由于只支持1年的K线合并,因此不对self.num做判断"""
# self.num = 1
new: bool = True
if len(self.bars) > 0:
if self.bars[-1].startdatetime.year == bar.tradingday.year:
# 同一年
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.tradingday
mergebar.enddatetime = bar.tradingday
self.bars.append(mergebar)
def _MergeSeason(self, bar: BarData):
"""由于只支持1季的K线合并,因此不对self.num做判断"""
# self.num = 1
new: bool = True
if len(self.bars) > 0:
if isSameSeason(self.bars[-1].startdatetime, bar.tradingday):
# 同一季
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.tradingday
mergebar.enddatetime = bar.tradingday
self.bars.append(mergebar)
def _MergeMonth(self, bar: BarData):
new: bool = True
if len(self.bars) > 0:
if self.bars[-1].startdatetime.year == bar.tradingday.year:
if (self.num == self.bars[-1].count) and\
(self.bars[-1].enddatetime.month == bar.tradingday.month):
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if (self.num > self.bars[-1].count):
if self.bars[-1].enddatetime.month != bar.tradingday.month:
self.bars[-1].count += 1
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.tradingday
mergebar.enddatetime = bar.tradingday
self.bars.append(mergebar)
def _MergeWeek(self, bar: BarData):
new: bool = True
if len(self.bars) > 0:
if self.bars[-1].startdatetime.year == bar.tradingday.year:
if (self.num == self.bars[-1].count) and\
isSameWeek(self.bars[-1].enddatetime, bar.tradingday):
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if (self.num > self.bars[-1].count):
if not isSameWeek(self.bars[-1].enddatetime, bar.tradingday):
self.bars[-1].count += 1
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.tradingday
mergebar.enddatetime = bar.tradingday
self.bars.append(mergebar)
def _MergeDay(self, bar: BarData):
new: bool = True
if len(self.bars) > 0:
if self.bars[-1].startdatetime.year == bar.tradingday.year:
if (self.num == self.bars[-1].count) and\
(self.bars[-1].enddatetime.date() == bar.tradingday.date()):
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if (self.num > self.bars[-1].count):
if self.bars[-1].enddatetime.date() != bar.tradingday.date():
self.bars[-1].count += 1
self.bars[-1].enddatetime = bar.tradingday
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.tradingday
mergebar.enddatetime = bar.tradingday
self.bars.append(mergebar)
def _MergeMinute(self, bar: BarData):
new: bool = True
if len(self.bars) > 0:
if self.bars[-1].bar.tradingday.date() == bar.tradingday.date():
if isInSameTimeSeg([self.bars[-1].startdatetime.time(),
self.bars[-1].enddatetime.time()],
bar.datetime,
self.tradeseg):
self.bars[-1].enddatetime = self.bars[-1].enddatetime.replace(
year=bar.datetime.year,
month=bar.datetime.month,
day=bar.datetime.day)
self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
new = False
if new:
ts = GetSeg(self.tradesplitseg, bar.datetime)
if ts:
mergebar = self._SuckNew(bar)
mergebar.startdatetime = bar.datetime
mergebar.enddatetime = bar.datetime
mergebar.startdatetime = mergebar.startdatetime.replace(
hour=ts[0].hour, minute=ts[0].minute)
mergebar.enddatetime = mergebar.enddatetime.replace(
hour=ts[1].hour, minute=ts[1].minute)
self.bars.append(mergebar)
由于夜盘的存在,交易日就成了非常重要的信息,CTP行情信息中包含了TradingDay信息的,我自己修改了代码,建议官方合入