milter wrote:
21:00-23:30(4根小时K线) 为何这是4根K线呢?21-22一根,22-23一根,23-23:30一根,这不是3根吗
可能你用的是vnpy自带的BarGenerator产生的1小时K线造成的,它在实盘的时候就会有这个缺点。
如果你交易的是这国内期货,20:55~21:00是集合竞价阶段,在20:59时客户端会收到一个集合竞价tick,所以:
如果不明白为什么,请往前看1楼的帖子,细细地想下。
也可以参考本人的另外帖子 K线种类总结
怎么删除不了我这个跟帖?
(贴子标题不小心打错字了,无法修改 ,抱歉!!!)
首先用VSCode创建新文档,把它保存为扩展名为md的文档,这就表示它是一个markdown文档了。
说明:
- 虽然vn.py官网支持markdown语法,但是支持的还不够全面(也许是咱不熟悉),我编写的演示文档有许多地方还是不可在vn.py官网,所以我把不可以直接被显示的地方贴在显示效果之前,您可以把这些代码复制到VSCode中,在markdown相关插件齐全的情况下,是可以成功显示的。
- 例如后面Tek数学公式可mermaid绘图语法的演示,分成实现代码和显示效果,您可以复制到VSCode中,参考示例的写法,修改成符合自己要求的各种图。
无序列表
标题的写法:Markdown标题由 ‘#’ 开始:
# —— 一级标题
## —— 二级标题
### —— 三级标题
#### —— 四级标题
##### —— 五级标题
###### —— 六级标题
其中侧边栏上半部分显示的是当前工作区文件夹下的文件,包括 Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。
这里是引用演示:
> 1st reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。
>> 2nd reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。
>>> third level reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。
显示效果:
1st reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。2nd reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。third level reference
Markdown 文件和素材。下半部分可以展开「Outline」视图,即大纲,可以根据当前正在编辑的 Markdown 文件显示其各级标题的大纲目录。
下面是python代码示例:
python
import time
# Quick,count to ten!for i in range( 10):
# ( but not *too* quick)time.sleep(0.5)
for i in range(1,100):
print(i)
这是行内的代码:x = this.count
,它是嵌入到文字中的。
无序列表
有序列表
字体修饰(加重、斜体、删除)
加重显示 值得一提的是斜体字,
VS Code 的 Markdown 预览默认渲染的是当前你正在编辑的文档,不过如果你希望将某个文档的预览渲染锁定不动,可以通过「Markdown: Toggle Preview Locking」调出一个锁定某个文档的预览界面。
···
InLine math equations:$\omega = d\phi / dt$. Display math should get its own line like so:
$$I = \int \rho R^{2} dV$$
$$ 2+3 = 5 $$
$x+y=z^2$
$ax+by=cz$
$2x+3y=4z$
···
显示:
用代码插入图片示例 !

显示:
剪贴板图片插入示例 !
- 先找到自己需要的界面,将图形复制到剪贴板
- 在VSCode中编辑的md文档需要插入图片的位置,点击鼠标右键,选择“Markdown Paste”或者Ctrl+Alt+V快捷键,进入下面的图片命名环节,可以默认,也可以另外输入新名称。
- 完成上面的两步后,md文档就被自动添加了类似下面的语句:

,当然它和手工输入是同样的显示效果。
```mermaid
sequenceDiagram
participant Alice
participant Bob
Alice->>John: Hello John,how are you?
loop Healthcheck
John->>John: Fight against hypochondria
end
Note right of John: Rational thoughts<br/>prevail...
John-->>Alice: Great!
John->>Bob: How about you?
Bob-->>John: Jolly good!|
John-->Alice:我爱你!
```
显示:
```mermaid
sequenceDiagram
Note right of A: 倒霉, 碰到B了
A->B: Hello B, how are you ?
note left of B: 倒霉,碰到A了
B-->A: Fine, thx, and you?
note over A,B: 快点溜,太麻烦了
A->>B: I'm fine too.
note left of B: 快点打发了A
B-->>A: Great!
note right of A: 溜之大吉
A-xB: Wait a moment
loop Look B every minute
A->>B: look B, go?
B->>A: let me go?
end
B--xA: I'm off, byte
note right of A: 太好了, 他走了
```
```mermaid
sequenceDiagram
Alice->>Bob: Hello Bob, how are you?
alt is sick
Bob->>Alice:not so good :(
else is well
Bob->>Alice:good
end
opt Extra response
Bob->>Alice:Thanks for asking
end
```
```mermaid
sequenceDiagram
# 通过设定参与者(participant)的顺序控制展示顺序
participant B
participant A
Note right of A: 倒霉, 碰到B了
A->B: Hello B, how are you ?
note left of B: 倒霉,碰到A了
B-->A: Fine, thx, and you?
note over A,B:快点溜,太麻烦了。。。
A->>B: I'm fine too.
note left of B: 快点打发了A
B-->>A: Great!
note right of A: 溜之大吉
A-xB: Wait a moment
loop Look B every minute
A->>B: look B, go?
B->>A: let me go?
end
B--xA: I'm off, byte
note right of A: 太好了, 他走了
```
```mermaid
sequenceDiagram
# 通过设定参与者(participants)的顺序控制展示模块顺序
participant Alice
participant Bob
participant John
Alice->John:Hello John,how are you?
loop Healthcheck
John->John:Fight against hypochondria
end
Note right of John:Rational thoughts <br/>prevail... John-->Alice:Great!
John->Bob: How about you?
Bob-->John: good!
```
```mermaid
gantt
dateFormat YYYY-MM-DD
section S1
T1: 2014-01-01, 9d
section S2
T2: 2014-01-11, 9d
section S3
T3: 2014-01-02, 9d
```
```mermaid
gantt
dateFormat YYYY-MM-DD
title Adding GANTT diagram functionality to mermaid
section A section
Completed task :done, des1, 2014-01-06,2014-01-08
Active task :active, des2, 2014-01-09, 3d
Future task : des3, after des2, 5d
Future task2 : des4, after des3, 5d
section Critical tasks
Completed task in the critical line :crit, done, 2014-01-06,24h
Implement parser and jison :crit, done, after des1, 2d
Create tests for parser :crit, active, 3d
Future task in critical line :crit, 5d
Create tests for renderer :2d
Add to mermaid :1d
section Documentation
Describe gantt syntax :active, a1, after des1, 3d
Add gantt diagram to demo page :after a1 , 20h
Add another diagram to demo page :doc1, after a1 , 48h
section Last section
Describe gantt syntax :after doc1, 3d
Add gantt diagram to demo page : 20h
Add another diagram to demo page : 48h
```
```mermaid
classDiagram
Class01 <|-- AveryLongClass : Cool
Class03 *-- Class04
Class05 o-- Class06
Class07 .. Class08
Class09 --> C2 : Where am i?
Class09 --* C3
Class09 --|> Class07
Class07 : equals()
Class07 : Object[] elementData
Class01 : size()
Class01 : int chimp
Class01 : int gorilla
Class08 <--> C2: Cool label
```
```mermaid
pie
"Dogs" : 386
"Cats" : 85
"Rats" : 15
```
```mermaid
erDiagram
CUSTOMER ||--o{ ORDER : places
ORDER ||--|{ LINE-ITEM : contains
CUSTOMER }|..|{ DELIVERY-ADDRESS : uses
```
```mermaid
journey
title My working day
section Go to work
Make tea: 5: Me
Go upstairs: 3: Me
Do work: 1: Me, Cat
section Go home
Go downstairs: 5: Me
Sit down: 5: Me
```
本文给您介绍了这些文档元素:
- 各级标题(1~6级)
- 文本修饰(字体、颜色、加粗、斜体、删除线...)
- 无序列表
- 有序列表
- 程序代码(行内和独立)
- 引用
- LaTex数学表达式(行内和独立)
- 表格(包括对齐方式)
- 图片(手工插入和自动插入)
- 超链接
- 各种Mermaid图
- 序列图
- 流程图
- 甘特图
- 类图
- 饼图
- 实体
- 旅游图
掌握了上述的Markdown语法,您可以轻松编写自己的Markdown文档了!
学习爱好者 wrote:
请问在交易时段有新的K线生成的时候会自动画图吗,还是需要手动画图呢
答复:
天涯地角 wrote:
我开通了徽商期货的通道,然后按照大神的修改,获取合约的保证金和合约交易的手续费率,RM205和MA205这两个合约能获取到保证金,但是不能获取到手续费是多少,返回的手续费都是0,查询别的合约是正常的,问了徽商技术客服,他们说后台设置都没有问题。他们给我发的CTP文件这两个文件thostmduserapi_se.lib和thosttraderapi_se.lib和vnpy封装的ctp的这两个libthosttraderapi_se.so,libthostmduserapi_se.so不一样。都是6.5.1的版本,像这种情况是什么原因导致的了?有什么办法解决吗?帮忙解答下,谢谢!
答复:
可以找徽商期货要一个快期客户端,版本和他们交易服务器使用的CTP版本一致。然后使用你的账户登录快期,获取一下你感兴趣合约保证金和手续费率。
可能有两种结果:
1、如果快期也不能获取到保证金是多少,手续费也都是0,那么说明徽商期货的CTP的交易接口有问题!
2、如果快期可以获取到保证金和手续费,并且也是你现在享受的费率,说明你所使用的vnpy的CTP的交易接口版本和徽商期货的不一致。
overview.start = min(bars[0].datetime, overview.start)
overview.end = max(bars[-1].datetime, overview.end)
它会把DbBarData中相同symbol、exchange和interval的所有bar的生成一个数据总揽overview,
并且overview的起止时间分别为这些bar最早时间戳和最晚时间戳——无论这些bars是由几个时间段构成的。
也就是说,如果您曾经下载过过螺纹rb2201.SHFE的日线数据:
这三次下载后,rb2201.SHFE的日线数据的BarOverview的起止时间:2021-1-16~2021-12-22,而实际上数据库中是不存在2021-5-17~2021-11-30,除非我们查询明细,否则我们不知道它们还缺少哪些日期的K线数据!
我觉得对于下载的时间段重叠的可以合并成为一个BarOverview,目前vnpy在这点上的处理就很好。
而对于两个没有任何重叠时间段的两段bars是不应该合并的,因为这会引起误导!仔细地研究下数据管理模块的界面,就会明白我的意思了。
在jupyter notebook中,我编写了下面的一段下载合约K线数据的代码:
from datetime import datetime,date,timedelta
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.constant import Exchange,Interval
from vnpy.trader.object import HistoryRequest
from vnpy.trader.database import get_database
from vnpy.trader.datafeed import get_datafeed
df = get_datafeed()
end = datetime.now()
start = end.replace(month=11)
start,end
req = HistoryRequest(symbol='rb2201',exchange=Exchange.SHFE,start=start,end=end,interval=Interval.DAILY)
# 语句1
bars = df.query_bar_history(req)
# 语句2
db.save_bar_data(bars)
# 语句3
for bar in bars:
print(bar)
执行上面代码,当执行到语句3的时候,提示:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-f72a667896f4> in <module>
1 for bar in bars:
----> 2 print(bar)
D:\ProgramFiles\vnstudio\lib\dataclasses.py in __repr__(self)
AttributeError: 'BarData' object has no attribute 'gateway_name'
如果把语句2注释掉,执行语句3的时候有没有任何问题,可以把从米筐接口读取的rb2201.SHFE的bar全部打印出来。
由于本人使用的是mysql数据库,所以需要对vnpy_mysql目录下的mysql_database.py进行修改,使用其他数据库的用户其实也有同样的问题。解决方法也是一样的,不再每个都说。
方法:
打开vnpy_mysql\mysql_database.py,在文件头部添加下面的语句:
import copy # hxxjava
对下面两个函数进行如下的修改,save_bar_data()和save_tick_data()都有同样的问题,修改之处语句在函数中注明了,错误原因是在遍历保存bars和ticks的过程中,因为数据库的要求,改变了每个bar和tick的 __dict__ 字典,保存动作做完了以后,列表bars和列表ticks的每个bar和tick已经被改得面目全非了,里面的gateway_name已经没有了。
修改过的代码后面有 # hxxjava change注释标记。
修改后的代码如下:
def save_bar_data(self, bars: List[BarData]) -> bool:
"""保存K线数据"""
# 读取主键参数
bar = bars[0]
symbol = bar.symbol
exchange = bar.exchange
interval = bar.interval
# 将BarData数据转换为字典,并调整时区
data = []
for bar in bars:
bar.datetime = convert_tz(bar.datetime)
d = copy.copy(bar.__dict__) # hxxjava change
d["exchange"] = d["exchange"].value
d["interval"] = d["interval"].value
d.pop("gateway_name")
d.pop("vt_symbol")
data.append(d)
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbBarData.insert_many(c).on_conflict_replace().execute()
# 更新K线汇总数据
overview: DbBarOverview = DbBarOverview.get_or_none(
DbBarOverview.symbol == symbol,
DbBarOverview.exchange == exchange.value,
DbBarOverview.interval == interval.value,
)
if not overview:
overview = DbBarOverview()
overview.symbol = symbol
overview.exchange = exchange.value
overview.interval = interval.value
overview.start = bars[0].datetime
overview.end = bars[-1].datetime
overview.count = len(bars)
else:
overview.start = min(bars[0].datetime, overview.start)
overview.end = max(bars[-1].datetime, overview.end)
s: ModelSelect = DbBarData.select().where(
(DbBarData.symbol == symbol)
& (DbBarData.exchange == exchange.value)
& (DbBarData.interval == interval.value)
)
overview.count = s.count()
overview.save()
return True
def save_tick_data(self, ticks: List[TickData]) -> bool:
"""保存TICK数据"""
# 将TickData数据转换为字典,并调整时区
data = []
for tick in ticks:
tick.datetime = convert_tz(tick.datetime)
d = copy.copy(tick.__dict__) # hxxjava change
d["exchange"] = d["exchange"].value
d.pop("gateway_name")
d.pop("vt_symbol")
data.append(d)
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbTickData.insert_many(c).on_conflict_replace().execute()
return True
duke wrote:
大神,增加函数定义后,继续报错
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑
left_alphas()是把提取合约的品种名称,如rb2201的品种是rb,TA201的品种是TA。
因为交易状态是按照品种发布的,而你订阅的tick中包含的是合约名称,所以要用
left_alphas()提取出品种名称,方便查找其当前的交易状态。
duke wrote:
还有一个问题,symbol,exchange = extract_vt_symbol(vt_symbol),这个 extract_vt_symbol函数在哪里定义?
这个在vnpy\trader\utility.py中,
引用方法:from vnpy.trader.utility import extract_vt_symbol
另外:遇到这种东西,应该是先搜索下吧,这个问题太基础了。
def left_alphas(instr:str):
""" get lefe alphas of a string """
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
你快成图表专家了!
RiskandPortfolio.pdf 文件下载下来后提示是坏损的,无法打开。
jingsiaosing wrote:
TradeStatusManager的定义是放在vnpy/trader/utility.py中的吗
去我以往的帖子中找,我有分享TradeStatusManager
from pathlib import Path
import sys
import pytz
from datetime import datetime
from time import sleep, time
from vnpy.event.engine import EventEngine
from typing import Callable, Dict, List
import threading # hxxjava debug
from vnpy.trader.constant import (
Direction,
HedgeType,
Offset,
Exchange,
OrderType,
Product,
Status,
OptionType,
InstrumentStatus,
StatusEnterReason,
IdCardType,
ProductClass,
)
from vnpy.trader.gateway import BaseGateway,Function
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData,
StatusData, # hxxjava add
MarginData, # hxxjava add
MarginRateAdjustData, # hxxjava add
InstrumentMarginData, # hxxjava add
CommissionData, # hxxjava add
OrderCommRateData, # hxxjava add
BrokerTradingParamsData, # hxxjava add
InvestorData, # hxxjava add
ProductData, # hxxjava add
MarginRequest, # hxxjava add
MarginRateAdjustRequest, # hxxjava add
CommissionRequest, # hxxjava add
OrderCommRateRequest, # hxxjava add
BrokerTradingParamsRequest, # hxxjava add
ProductRequst, # hxxjava add
GatewayData, # hxxjava add
OrderRequest,
CancelRequest,
SubscribeRequest,
)
from vnpy.trader.utility import get_folder_path
from vnpy.trader.event import EVENT_TIMER
from threading import Timer # hxxjava add
from ..api import (
MdApi,
TdApi,
THOST_FTDC_OAS_Submitted,
THOST_FTDC_OAS_Accepted,
THOST_FTDC_OAS_Rejected,
THOST_FTDC_OST_NoTradeQueueing,
THOST_FTDC_OST_PartTradedQueueing,
THOST_FTDC_OST_AllTraded,
THOST_FTDC_OST_Canceled,
THOST_FTDC_D_Buy,
THOST_FTDC_D_Sell,
THOST_FTDC_PD_Long,
THOST_FTDC_PD_Short,
THOST_FTDC_OPT_LimitPrice,
THOST_FTDC_OPT_AnyPrice,
THOST_FTDC_OF_Open,
THOST_FTDC_OFEN_Close,
THOST_FTDC_OFEN_CloseYesterday,
THOST_FTDC_OFEN_CloseToday,
THOST_FTDC_PC_Futures,
THOST_FTDC_PC_Options,
THOST_FTDC_PC_SpotOption,
THOST_FTDC_PC_Combination,
THOST_FTDC_CP_CallOptions,
THOST_FTDC_CP_PutOptions,
THOST_FTDC_HF_Speculation,
THOST_FTDC_CC_Immediately,
THOST_FTDC_FCC_NotForceClose,
THOST_FTDC_TC_GFD,
THOST_FTDC_VC_$,
THOST_FTDC_TC_IOC,
THOST_FTDC_VC_CV,
THOST_FTDC_AF_Delete
)
# 委托状态映射
STATUS_CTP2VT: Dict[str, Status] = {
THOST_FTDC_OAS_Submitted: Status.SUBMITTING,
THOST_FTDC_OAS_Accepted: Status.SUBMITTING,
THOST_FTDC_OAS_Rejected: Status.REJECTED,
THOST_FTDC_OST_NoTradeQueueing: Status.NOTTRADED,
THOST_FTDC_OST_PartTradedQueueing: Status.PARTTRADED,
THOST_FTDC_OST_AllTraded: Status.ALLTRADED,
THOST_FTDC_OST_Canceled: Status.CANCELLED
}
# 多空方向映射
DIRECTION_VT2CTP: Dict[Direction, str] = {
Direction.LONG: THOST_FTDC_D_Buy,
Direction.SHORT: THOST_FTDC_D_Sell
}
DIRECTION_CTP2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2CTP.items()}
DIRECTION_CTP2VT[THOST_FTDC_PD_Long] = Direction.LONG
DIRECTION_CTP2VT[THOST_FTDC_PD_Short] = Direction.SHORT
# 委托类型映射
ORDERTYPE_VT2CTP: Dict[OrderType, str] = {
OrderType.LIMIT: THOST_FTDC_OPT_LimitPrice,
OrderType.MARKET: THOST_FTDC_OPT_AnyPrice
}
ORDERTYPE_CTP2VT: Dict[str, OrderType] = {v: k for k, v in ORDERTYPE_VT2CTP.items()}
# 开平方向映射
OFFSET_VT2CTP: Dict[Offset, str] = {
Offset.OPEN: THOST_FTDC_OF_Open,
Offset.CLOSE: THOST_FTDC_OFEN_Close,
Offset.CLOSETODAY: THOST_FTDC_OFEN_CloseToday,
Offset.CLOSEYESTERDAY: THOST_FTDC_OFEN_CloseYesterday,
}
OFFSET_CTP2VT: Dict[str, Offset] = {v: k for k, v in OFFSET_VT2CTP.items()}
# 交易所映射
EXCHANGE_CTP2VT: Dict[str, Exchange] = {
"CFFEX": Exchange.CFFEX,
"SHFE": Exchange.SHFE,
"CZCE": Exchange.CZCE,
"DCE": Exchange.DCE,
"INE": Exchange.INE
}
# 产品类型映射
PRODUCT_CTP2VT: Dict[str, Product] = {
THOST_FTDC_PC_Futures: Product.FUTURES,
THOST_FTDC_PC_Options: Product.OPTION,
THOST_FTDC_PC_SpotOption: Product.OPTION,
THOST_FTDC_PC_Combination: Product.SPREAD
}
# 期权类型映射
OPTIONTYPE_CTP2VT: Dict[str, OptionType] = {
THOST_FTDC_CP_CallOptions: OptionType.CALL,
THOST_FTDC_CP_PutOptions: OptionType.PUT
}
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
# 其他常量
MAX_FLOAT = sys.float_info.max # 浮点数极限值
CHINA_TZ = pytz.timezone("Asia/Shanghai") # 中国时区
# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}
class CtpGateway(BaseGateway):
"""
vn.py用于对接期货CTP柜台的交易接口。
"""
default_setting: Dict[str, str] = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"行情服务器": "",
"产品名称": "",
"授权编码": ""
}
exchanges: List[str] = list(EXCHANGE_CTP2VT.values())
def __init__(self, event_engine: EventEngine, gateway_name: str = "CTP") -> None:
"""构造函数"""
super().__init__(event_engine, gateway_name)
self.td_api: "CtpTdApi" = CtpTdApi(self)
self.md_api: "CtpMdApi" = CtpMdApi(self)
def connect(self, setting: dict) -> None:
"""连接交易接口"""
userid: str = setting["用户名"]
password: str = setting["密码"]
brokerid: str = setting["经纪商代码"]
td_address: str = setting["交易服务器"]
md_address: str = setting["行情服务器"]
appid: str = setting["产品名称"]
auth_code: str = setting["授权编码"]
if (
(not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://"))
):
td_address = "tcp://" + td_address
if (
(not md_address.startswith("tcp://"))
and (not md_address.startswith("ssl://"))
):
md_address = "tcp://" + md_address
self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid)
self.md_api.connect(md_address, userid, password, brokerid)
self.init_query()
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
self.md_api.subscribe(req)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
# 期权询价
if req.type == OrderType.RFQ:
vt_orderid: str = self.td_api.send_rfq(req)
# 其他委托
else:
vt_orderid: str = self.td_api.send_order(req)
return vt_orderid
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
self.td_api.cancel_order(req)
def query_account(self) -> None:
"""查询资金"""
self.td_api.query_account()
def query_position(self) -> None:
"""查询持仓"""
self.td_api.query_position()
def queryInvestUnit(self): # hxxjava add
"""查询投资单元"""
self.td_api.queryInvestUnit()
def query_broker_trading_params(self,req:BrokerTradingParamsRequest): # hxxjava add
""" 查询经纪公司交易参数 """
self.td_api.query_broker_trading_params(req)
def query_investor(self) -> None:
"""
查询投资者
"""
self.td_api.query_investor()
def query_product(self,req:ProductRequst) -> None:
""" 查询产品信息 """
self.td_api.query_product(req)
def query_commission(self,req:CommissionRequest) -> None: # hxxjava add
"""查询手续费数据"""
self.td_api.query_commission(req)
def query_order_commission(self,req:OrderCommRateRequest): # hxxjava add
"""查询报单手续费数据"""
self.td_api.query_order_commission(req)
def query_margin_ratio(self,req:MarginRequest): # hxxjava add
"""查询保证金率数据"""
self.td_api.query_margin_ratio(req)
def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add
"""查询交易所保证金率数据"""
self.td_api.query_exchange_margin_rate(req)
def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add
"""查询交易所保证金率调整数据"""
self.td_api.query_exchange_margin_rate_adjust(req)
def close(self):
""""""
self.td_api.close()
self.md_api.close()
def write_error(self, msg: str, error: dict) -> None:
"""输出错误信息日志"""
error_id: int = error["ErrorID"]
error_msg: str = error["ErrorMsg"]
msg = f"{msg},代码:{error_id},信息:{error_msg}"
self.write_log(msg)
def process_timer_event(self, event) -> None:
"""定时事件处理"""
self.count += 1
if self.count < 2:
return
self.count = 0
if self.td_api.connect_status:
print(
"交易所保证金:",len(self.td_api.ex_margin_rate_data),
"合约状态:",len(self.td_api.status_data),
"合约信息:",len(symbol_contract_map.keys()),
"委托单数:",len(self.td_api.order_data),
"成交单数:",len(self.td_api.trade_data),
"报单+查询:",(len(self.td_api.order_functions),len(self.td_api.query_functions))
)
self.md_api.update_date()
def init_query(self) -> None:
"""初始化查询任务"""
self.count: int = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: List[str] = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.now().strftime("%Y%m%d")
def onFrontConnected(self) -> None:
"""服务器连接成功回报"""
self.gateway.on_connect(GatewayData(name="CTP",type='MD')) # hxxjava add
self.gateway.write_log("行情服务器连接成功")
self.login()
def onFrontDisconnected(self, reason: int) -> None:
"""服务器连接断开回报"""
self.login_status = False
self.gateway.on_disconnect(GatewayData(name="CTP",type='MD',reason=reason)) # hxxjava add
self.gateway.write_log(f"行情服务器连接断开,原因{reason}")
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.login_status = True
self.gateway.write_log("行情服务器登录成功")
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
self.gateway.write_error("行情服务器登录失败", error)
def onRspError(self, error: dict, reqid: int, last: bool) -> None:
"""请求报错回报"""
self.gateway.write_error("行情接口报错", error)
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""订阅行情回报"""
if not error or not error["ErrorID"]:
return
self.gateway.write_error("行情订阅失败", error)
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
timestamp: str = f"{self.current_date} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt = CHINA_TZ.localize(dt)
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
def connect(self, address: str, userid: str, password: str, brokerid: int) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
# 禁止重复发起连接,会导致异常崩溃
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
self.registerFront(address)
self.init()
self.connect_status = True
def login(self) -> None:
"""用户登录"""
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
if self.login_status:
self.subscribeMarketData(req.symbol)
self.subscribed.add(req.symbol)
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def update_date(self) -> None:
"""更新当前日期"""
self.current_date = datetime.now().strftime("%Y%m%d")
class CtpTdApi(TdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
# hxxjava start
self.order_functions:List[Function] = []
self.query_functions:List[Function] = []
self.inited = False
self.debug = True
self.td_lock = threading.Lock()
# hxxjava end
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.order_ref: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.auth_status: bool = False
self.login_failed: bool = False
self.contract_inited: bool = False
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.auth_code: str = ""
self.appid: str = ""
self.frontid: int = 0
self.sessionid: int = 0
# hxxjava start
self.constract_data: List[dict] = []
self.status_data: List[dict] = []
self.ex_margin_rate_data: List[dict] = []
# hxxjava end
self.order_data: List[dict] = []
self.trade_data: List[dict] = []
self.positions: Dict[str, PositionData] = {}
self.sysid_orderid_map: Dict[str, str] = {}
self.init_order_query_functions()
def init_order_query_functions(self) -> None:
"""初始化查询任务"""
self.order_functions.clear()
self.query_functions.clear()
self.query_functions.append(Function(func=self.query_investor,once=True))
self.query_functions.append(Function(func=self.query_broker_trading_params,param={'req':BrokerTradingParamsRequest()},once=True))
self.query_functions.append(Function(func=self.query_account,maxcount=3))
self.query_functions.append(Function(func=self.query_position,maxcount=3))
self.exec_order_cmds()
self.exec_query_cmds()
def exec_order_cmds(self): # hxxjava add
""" 执行报单命令 """
if self.inited and self.order_functions:
# 报单命令列表中有内容
order_func:Function = self.order_functions.pop(0)
ret_val = order_func.exec()
if ret_val == 0:
# 报单命令执行成功
if not order_func.once:
self.order_functions.append(order_func)
else:
# 报单命令执行失败,添加到命令列表的头部
self.order_functions.insert(0,order_func)
self.order_timer = Timer(1.0/6, self.exec_order_cmds)
self.order_timer.start()
def exec_query_cmds(self): # hxxjava add
""" 执行查询命令 """
if self.inited and self.query_functions:
# 查询命令列表中有内容
func:Function = self.query_functions.pop(0)
ret_val = func.exec()
# print(f"{func.to_str()} exec()={ret_val}")
if ret_val == 0:
# 查询命令执行成功
if not func.once:
# 周期性查询命令,添加到命令列表的尾部
self.query_functions.append(func)
elif ret_val == -5:
# 循环计数未达到的,添加到命令列表的尾部
self.query_functions.append(func)
elif ret_val in [-2,-3]:
# 查询命令执行失败,添加到命令列表的头部
self.query_functions.insert(0,func)
self.query_timer = Timer(1.0, self.exec_query_cmds)
self.query_timer.start()
def execute_func(self,func:Callable,req:Dict,prompt:str="") -> int:
"""
统一的查询执行函数,
# 返回值:
# 0:成功;-1:网络连接失败
# -2:未处理请求超过许可数;-3:每秒发送请求数超过许可数
"""
self.td_lock.acquire()
self.reqid += 1
result = func(req,self.reqid)
self.td_lock.release()
if prompt and self.debug:
print(f"{prompt} excuted at {datetime.now()},result={result}")
return result
def query_investor(self):
"""" 请求查询投资者 """
req = {"BrokerID":self.brokerid,"InvestorID":self.userid}
if self.execute_func(func=self.reqQryInvestor,req=req,prompt="请求查询投资者") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_investor,once=True))
def onRspQryInvestor(self, data: dict, error: dict, reqid: int, last: bool): # -> None:
"""
请求查询投资者响应,当执行ReqQryInvestor后,该方法被调用。
"""
# print(f"onRspQryInvestor data={data},error={error},reqid={reqid},last={last}")
if data:
investor = InvestorData(
id = data['InvestorID'],
name = data['InvestorName'],
broker = data['BrokerID'],
group = data['InvestorGroupID'],
identifiedCardType = IdCardType(data['IdentifiedCardType']),
identifiedCardNo = data['IdentifiedCardNo'],
is_active = data['IsActive'],
telephone = data['Telephone'],
address = data['Address'],
open_date = data['OpenDate'],
mobile = data['Mobile'],
commission_model = data['CommModelID'],
margin_model = data['MarginModelID'],
gateway_name=self.gateway_name
)
self.gateway.on_query_investor(investor)
def queryInvestUnit(self): # hxxjava add
"""
# 查询投资单元
"""
if not self.connect_status:
return
req = {}
req["BrokerID"]=self.brokerid
req["InvestorID"]=self.userid
req["InvestUnitID"]=""
if self.execute_func(func=self.reqQryInvestUnit,req=req,prompt="查询投资单元") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.queryInvestUnit,once=True))
def onRspQryInvestUnit(self, data: dict, error: dict, reqid: int, last: bool): # -> None:
"""
# 查询投资单元应答
"""
print(f"onRspQryInvestUnit data={data} error={error}")
def onRspQryInstrumentCommissionRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询合约手续费率
"""
if data:
commission = CommissionData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
open_ratio_bymoney=data['OpenRatioByMoney'],
open_ratio_byvolume=data['OpenRatioByVolume'],
close_ratio_bymoney=data['CloseRatioByMoney'],
close_ratio_byvolume=data['CloseRatioByVolume'],
close_today_ratio_bymoney=data['CloseTodayRatioByMoney'],
close_today_ratio_byvolume=data['CloseTodayRatioByVolume'],
investor_range = data['InvestorRange'],
biz_type = data['BizType'],
invest_unit_id = data['InvestUnitID'],
gateway_name=self.gateway_name
)
# print(f"onRspQryInstrumentCommissionRate = {data}")
self.gateway.on_commission(commission)
def onRspQryInstrumentOrderCommRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询报单手续费
"""
if data:
commission = OrderCommRateData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
investor_range = data['InvestorRange'],
invest_unit_id = data['InvestUnitID'],
hedge_flag = data['HedgeFlag'],
order_comm_byvolume=data['OrderCommByVolume'],
order_action_comm_byvolume=data['OrderActionCommByVolume'],
gateway_name=self.gateway_name
)
self.gateway.on_order_commission(commission)
def onRspQryInstrumentMarginRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询合约保证金率
"""
if data:
margin_rate = InstrumentMarginData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
long_margin_rate = data["LongMarginRatioByMoney"],
long_margin_perlot = data["LongMarginRatioByVolume"],
short_margin_rate = data["ShortMarginRatioByMoney"],
short_margin_perlot = data["ShortMarginRatioByVolume"],
is_ralative=data['IsRelative'],
investor_id=data['InvestorID'],
invest_unit_id=data['InvestUnitID'],
gateway_name=self.gateway_name
)
self.gateway.on_instrument_margin_rate(margin_rate)
def onRspQryExchangeMarginRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
收到查询交易所保证金率结果
"""
if not self.contract_inited:
# 合约数据还没有初始化
if error:
print(f"查询交易所保证金率 错误={error}")
if data:
self.ex_margin_rate_data.append(data)
if last:
self.gateway.write_log(f"开始查询合约信息")
if self.execute_func(func=self.reqQryInstrument,req={},prompt="查询合约信息") in [-2,-3]:
self.timeContract = Timer(interval=1.0,function = self.onRspQryExchangeMarginRate,
kwargs={'data':None,'error':None,'reqid':reqid,'last':True})
self.timeContract.start()
return
elif error:
pass
elif data:
margin_rate = self.extractExchangeMarginRate(data)
self.gateway.on_exchange_margin_rate(margin_rate)
# print(f"查询交易所保证金率结果 :data={data} error={error}")
def extractExchangeMarginRate(self,data:dict): # hxxjava add
""" 提取交易所保证金率并且发送给网关 """
return MarginData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # 总是为空 EXCHANGE_CTP2VT[data["ExchangeID"]],
hedge_flag = HedgeType(data["HedgeFlag"]),
long_margin_rate = data["LongMarginRatioByMoney"],
long_margin_perlot = data["LongMarginRatioByVolume"],
short_margin_rate = data["ShortMarginRatioByMoney"],
short_margin_perlot = data["ShortMarginRatioByVolume"],
gateway_name=self.gateway_name
)
def onRspQryExchangeMarginRateAdjust(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
收到查询交易所保证金率结果
"""
if error:
pass
if data:
adjust = MarginRateAdjustData(
symbol = data["InstrumentID"],
hedge_flag = HedgeType(data["HedgeFlag"]),
LongMarginRatioByMoney = data["LongMarginRatioByMoney"],
LongMarginRatioByVolume = data["LongMarginRatioByVolume"],
ShortMarginRatioByMoney = data["ShortMarginRatioByMoney"],
ShortMarginRatioByVolume = data["ShortMarginRatioByVolume"],
ExchLongMarginRatioByMoney = data["ExchLongMarginRatioByMoney"],
ExchLongMarginRatioByVolume = data["ExchLongMarginRatioByVolume"],
ExchShortMarginRatioByMoney = data["ExchShortMarginRatioByMoney"],
ExchShortMarginRatioByVolume = data["ExchShortMarginRatioByVolume"],
NoLongMarginRatioByMoney = data["NoLongMarginRatioByMoney"],
NoLongMarginRatioByVolume = data["NoLongMarginRatioByVolume"],
NoShortMarginRatioByMoney = data["NoShortMarginRatioByMoney"],
NoShortMarginRatioByVolume = data["NoShortMarginRatioByVolume"],
gateway_name=self.gateway_name
)
# 并且发送给网关
self.gateway.on_exchange_margin_rate_adjust(adjust)
def onRspQryBrokerTradingParams(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
""" 请求查询经纪公司交易参数响应 """
from vnpy.trader.constant import HedgeType,InstrumentStatus,MarginPriceType,AlgorithmType,IncludeCloseProfitType,Currency,OptionRoyaltyPriceType # hxxjava add
if data:
# print(f"onRspQryBrokerTradingParams : {data}")
broker_trading_param = BrokerTradingParamsData(
margin_price_type = MarginPriceType(data["MarginPriceType"]),
algorithm = AlgorithmType(data["Algorithm"]),
avail_include_close_profit = IncludeCloseProfitType(data["AvailIncludeCloseProfit"]),
currency = Currency(data["CurrencyID"]) if data["CurrencyID"] else "",
option_royalty_price_type = OptionRoyaltyPriceType(data["OptionRoyaltyPriceType"]),
account = data["AccountID"],
gateway_name=self.gateway_name
)
self.gateway.on_broker_trading_params(broker_trading_param)
def onRspQryProduct(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
""" 请求查询产品响应 """
print(f"请求查询产品响应 :data={data} error={error}")
if data:
product = ProductData(
id = data["ProductID"],
name = data["ProductName"],
exchange = Exchange(data["ExchangeID"]),
product_class = data["ProductClass"],
size = data["VolumeMultiple"],
price_tick = data["PriceTick"],
MaxMarketOrderVolume = data["MaxMarketOrderVolume"],
MinMarketOrderVolume = data["MinMarketOrderVolume"],
MaxLimitOrderVolume = data["MaxLimitOrderVolume"],
MinLimitOrderVolume = data["MinLimitOrderVolume"],
PositionType = data["PositionType"],
PositionDateType = data["PositionDateType"],
TradeCurrencyID = data["TradeCurrencyID"],
CloseDealType = data["CloseDealType"],
MortgageFundUseRange = data["MortgageFundUseRange"],
ExchangeProductID = data["ExchangeProductID"],
UnderlyingMultiple = data["UnderlyingMultiple"],
gateway_name=self.gateway_name
)
self.gateway.on_query_product(product)
def query_broker_trading_params(self,req:BrokerTradingParamsRequest): # hxxjava add
""" 请求查询经纪公司交易参数 """
if not self.connect_status:
return
req_dict = {}
req_dict["BrokerID"] = self.brokerid
req_dict["InvestorID"] = self.userid
req_dict["CurrencyID"] = req.CurrencyID
req_dict["AccountID"] = req.AccountID
if self.execute_func(func=self.reqQryBrokerTradingParams,req=req_dict,prompt="查询经纪公司交易参数") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_broker_trading_params,once=True,param={'req':req}))
def query_product(self,req:ProductRequst) -> None:
""" 请求查询产品 """
if not self.connect_status:
return
req_dict = {}
req_dict["ProductID"] = req.product
req_dict["ProductClass"] = req.product_class.value
req_dict["ExchangeID"] = "" if not req.exchange else req.exchange.value
if self.execute_func(func=self.reqQryProduct,req=req_dict,prompt="请求查询产品") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_product,once=True,param={'req':req}))
def query_commission(self,req:CommissionRequest): # hxxjava add
""" 查询手续费率 """
if not self.connect_status:
return
commission_req = {}
commission_req['BrokerID'] = self.brokerid
commission_req['InvestorID'] = self.userid
commission_req['InvestorUnitID'] = self.userid #hxxjava debug char[31]
commission_req['InstrumentID'] = req.symbol
commission_req['ExchangeID'] = req.exchange.value if req.exchange else ""
if self.execute_func(func=self.reqQryInstrumentCommissionRate,req=commission_req,prompt="查询手续费率") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))
def query_order_commission(self,req:OrderCommRateRequest): # hxxjava add
""" 查询报单手续费 """
if not self.connect_status:
return
req_dict = {}
req_dict['BrokerID'] = self.brokerid
req_dict['InvestorID'] = self.userid
req_dict['InstrumentID'] = req.symbol
if self.execute_func(func=self.reqQryInstrumentOrderCommRate,req=req_dict,prompt="查询报单手续费") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))
def query_margin_ratio(self,req:MarginRequest): # hxxjava add
""" 合约保证金率查询 """
if not self.connect_status:
return
ctp_req = {}
ctp_req['BrokerID'] = self.brokerid
ctp_req['InvestorID'] = self.userid
ctp_req['HedgeFlag'] = req.hedge_type.value
ctp_req['ExchangeID'] = req.exchange.value if req.exchange else ""
ctp_req['InvestUnitID'] = self.userid # hxxjava debug char[81]
ctp_req['InstrumentID'] = req.symbol
if self.execute_func(func=self.reqQryInstrumentMarginRate,req=ctp_req,prompt="合约保证金率查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_margin_ratio,once=True,param={'req':req}))
def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add
""" 交易所保证金率查询 """
if not self.connect_status:
return
out_req = {}
out_req['BrokerID'] = self.brokerid
out_req['InstrumentID'] = req.symbol
out_req['HedgeFlag'] = req.hedge_type.value
out_req['ExchangeID'] = req.exchange.value if req.exchange else ""
if self.execute_func(func=self.reqQryExchangeMarginRate,req=out_req,prompt="交易所保证金率查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_exchange_margin_rate,once=True,param={'req':req}))
def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add
""" 保证金率调整查询 """
if not self.connect_status:
return
out_req = {}
out_req['BrokerID'] = self.brokerid
out_req['InstrumentID'] = req.symbol
out_req['HedgeFlag'] = req.hedge_type.value
if self.execute_func(func=self.reqQryExchangeMarginRateAdjust,req=out_req,prompt="交易所保证金率调整查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_exchange_margin_rate_adjust,once=True,param={'req':req}))
def onFrontConnected(self):
""""""
self.gateway.on_connect(GatewayData(name="CTP",type='TD')) # hxxjava add
self.gateway.write_log("交易服务器连接成功")
if self.auth_code:
self.authenticate()
else:
self.login()
def onFrontDisconnected(self, reason: int) -> None:
""" 服务器连接断开回报 """
self.login_status = False
self.gateway.on_disconnect(GatewayData(name="CTP",type='TD',reason=reason)) # hxxjava add
self.gateway.write_log(f"交易服务器连接断开,原因{reason}")
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户授权验证回报"""
if not error['ErrorID']:
self.auth_status = True
self.gateway.write_log("交易服务器授权验证成功")
self.login()
else:
self.gateway.write_error("交易服务器授权验证失败", error)
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.frontid = data["FrontID"]
self.sessionid = data["SessionID"]
self.login_status = True
self.gateway.write_log("交易服务器登录成功")
# 自动确认结算单
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqSettlementInfoConfirm(ctp_req, self.reqid)
else:
self.login_failed = True
self.gateway.write_error("交易服务器登录失败", error)
def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托下单失败回报"""
order_ref: str = data["OrderRef"]
orderid: str = f"{self.frontid}_{self.sessionid}_{order_ref}"
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
order: OrderData = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
price=data["LimitPrice"],
volume=float(data["VolumeTotalOriginal"]), # hxxjava change
status=Status.REJECTED,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.gateway.write_error("交易委托失败", error)
def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托撤单失败回报"""
self.gateway.write_error("交易撤单失败", error)
def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""确认结算单回报"""
self.gateway.write_log("结算信息确认成功")
# # 由于流控,单次查询可能失败,通过while循环持续尝试,直到成功发出请求
# while True:
# self.reqid += 1
# n: int = self.reqQryInstrument({}, self.reqid)
# if not n:
# break
# else:
# sleep(1)
# hxxjava change start
self.gateway.write_log(f"查询交易所保证金")
self.query_exchange_margin_rate(MarginRequest(symbol="",exchange=None))
# hxxjava change end
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""持仓查询回报"""
if not data:
return
# print(f"持仓查询回报:{data}")
# 必须已经收到了合约信息后才能处理
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if contract:
# 获取之前缓存的持仓数据缓存
key: str = f"{data['InstrumentID'], data['PosiDirection']}"
position: PositionData = self.positions.get(key, None)
if not position:
position = PositionData(
symbol=data["InstrumentID"],
exchange=contract.exchange,
direction=DIRECTION_CTP2VT[data["PosiDirection"]],
gateway_name=self.gateway_name
)
self.positions[key] = position
# 对于上期所昨仓需要特殊处理
if position.exchange in [Exchange.SHFE, Exchange.INE]:
if data["YdPosition"] and not data["TodayPosition"]:
position.yd_volume = data["Position"]
# 对于其他交易所昨仓的计算
else:
position.yd_volume = data["Position"] - data["TodayPosition"]
# 获取合约的乘数信息
size: int = contract.size
# 计算之前已有仓位的持仓总成本
cost: float = position.price * position.volume * size
# 累加更新持仓数量和盈亏
position.volume += data["Position"]
position.pnl += data["PositionProfit"]
# 计算更新后的持仓总成本和均价
if position.volume and size:
cost += data["PositionCost"]
position.price = cost / (position.volume * size)
# 更新仓位冻结数量
if position.direction == Direction.LONG:
position.frozen += data["ShortFrozen"]
else:
position.frozen += data["LongFrozen"]
if last:
for position in self.positions.values():
self.gateway.on_position(position)
self.positions.clear()
def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""资金查询回报"""
if "AccountID" not in data:
return
account: AccountData = AccountData(
accountid=data["AccountID"],
balance=data["Balance"],
frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
gateway_name=self.gateway_name
)
account.available = data["Available"]
self.gateway.on_account(account)
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""合约查询回报"""
product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product:
contract: ContractData = ContractData(
symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
name=data["InstrumentName"],
product=product,
size=data["VolumeMultiple"],
pricetick=data["PriceTick"],
# hxxjava add start
max_market_order_volume=data["MaxMarketOrderVolume"],
min_market_order_volume=data["MinMarketOrderVolume"],
max_limit_order_volume=data["MaxLimitOrderVolume"],
min_limit_order_volume=data["MinLimitOrderVolume"],
open_date=data["OpenDate"],
expire_date=data["ExpireDate"],
is_trading=data["IsTrading"],
long_margin_ratio=data["LongMarginRatio"],
short_margin_ratio=data["ShortMarginRatio"],
# hxxjava add end
gateway_name=self.gateway_name
)
# 期权相关
if contract.product == Product.OPTION:
# 移除郑商所期权产品名称带有的C/P后缀
if contract.exchange == Exchange.CZCE:
contract.option_portfolio = data["ProductID"][:-1]
else:
contract.option_portfolio = data["ProductID"]
contract.option_underlying = data["UnderlyingInstrID"]
contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
contract.option_strike = data["StrikePrice"]
contract.option_index = str(data["StrikePrice"])
contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
# self.gateway.on_contract(contract)
symbol_contract_map[contract.symbol] = contract
if last:
# 更新所有合约信息
self.gateway.write_log("更新所有合约信息...")
self.gateway.on_contract_end(symbol_contract_map.values())
self.contract_inited = True
self.gateway.write_log("合约信息查询成功")
self.gateway.write_log(f"收到{len(symbol_contract_map)}条合约信息")
self.gateway.write_log(f"提取{len(self.ex_margin_rate_data)}条交易所保证金率信息")
if self.ex_margin_rate_data:
EMRs = []
for data in self.ex_margin_rate_data:
EMRs.append(self.extractExchangeMarginRate(data))
self.gateway.on_exchange_margin_rate_end(EMRs)
self.ex_margin_rate_data.clear()
self.gateway.write_log(f"提取{len(self.status_data)}条状态信息")
if self.status_data:
statuses = []
for data in self.status_data:
statuses.append(self.extractInstrumentStatus(data))
self.gateway.on_status_end(statuses)
self.status_data.clear()
self.gateway.write_log(f"提取{len(self.order_data)}条委托单信息")
for data in self.order_data:
self.onRtnOrder(data)
self.order_data.clear()
self.gateway.write_log(f"提取{len(self.trade_data)}条成交单信息")
for data in self.trade_data:
self.onRtnTrade(data)
self.trade_data.clear()
self.inited = True
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if not self.contract_inited:
self.status_data.append(data)
return
status = self.extractInstrumentStatus(data)
self.gateway.on_status(status)
def extractInstrumentStatus(self,data:dict): # hxxjava add
""" 提取合约品种状态信息 """
return StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
def onRtnOrder(self, data: dict) -> None:
""" 委托更新推送 """
if not self.contract_inited:
self.order_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
frontid: int = data["FrontID"]
sessionid: int = data["SessionID"]
order_ref: str = data["OrderRef"]
orderid: str = f"{frontid}_{sessionid}_{order_ref}"
timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt = CHINA_TZ.localize(dt)
dt1 = datetime.strptime(f"{data['TradingDay']}", "%Y%m%d") # hxxjava add
tradingday = dt1.date() # hxxjava add
order = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
type=ORDERTYPE_CTP2VT[data["OrderPriceType"]],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
price=data["LimitPrice"],
volume=float(data["VolumeTotalOriginal"]), # hxxjava change
traded=float(data["VolumeTraded"]), # hxxjava change
status=STATUS_CTP2VT[data["OrderStatus"]],
datetime=dt,
tradingday=tradingday, # hxxjava add
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.sysid_orderid_map[data["OrderSysID"]] = orderid
def onRtnTrade(self, data: dict) -> None:
""" 成交数据推送 """
if not self.contract_inited:
self.trade_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
orderid: str = self.sysid_orderid_map[data["OrderSysID"]]
timestamp: str = f"{data['TradeDate']} {data['TradeTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt = CHINA_TZ.localize(dt)
datestr = f"{data['TradingDay']}" # hxxjava add
dt1 = datetime.strptime(datestr, "%Y%m%d") # hxxjava add
tradingday = dt1.date() # hxxjava add
trade = TradeData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
tradeid=data["TradeID"],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["OffsetFlag"]],
price=data["Price"],
volume=data["Volume"],
datetime=dt,
tradingday=tradingday, # hxxjava add
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def onRspForQuoteInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""询价请求回报"""
if not error["ErrorID"]:
symbol: str = data["InstrumentID"]
msg: str = f"{symbol}询价请求发送成功"
self.gateway.write_log(msg)
else:
self.gateway.write_error("询价请求发送失败", error)
def connect(
self,
address: str,
userid: str,
password: str,
brokerid: int,
auth_code: str,
appid: str
) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
self.auth_code = auth_code
self.appid = appid
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcTraderApi((str(path) + "\\Td").encode("GBK"))
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
self.registerFront(address)
self.init()
self.connect_status = True
else:
self.authenticate()
def authenticate(self) -> None:
"""发起授权验证"""
ctp_req: dict = {
"UserID": self.userid,
"BrokerID": self.brokerid,
"AuthCode": self.auth_code,
"AppID": self.appid
}
self.reqid += 1
self.reqAuthenticate(ctp_req, self.reqid)
def login(self) -> None:
"""用户登录"""
if self.login_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid,
"AppID": self.appid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
if req.offset not in OFFSET_VT2CTP:
self.gateway.write_log("请选择开平方向")
return ""
if req.type not in ORDERTYPE_VT2CTP:
self.gateway.write_log(f"当前接口不支持该类型的委托{req.type.value}")
return ""
self.order_ref += 1
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"LimitPrice": req.price,
"VolumeTotalOriginal": int(req.volume),
"OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""),
"Direction": DIRECTION_VT2CTP.get(req.direction, ""),
"CombOffsetFlag": OFFSET_VT2CTP.get(req.offset, ""),
"OrderRef": str(self.order_ref),
"InvestorID": self.userid,
"UserID": self.userid,
"BrokerID": self.brokerid,
"CombHedgeFlag": THOST_FTDC_HF_Speculation,
"ContingentCondition": THOST_FTDC_CC_Immediately,
"ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
"IsAutoSuspend": 0,
"TimeCondition": THOST_FTDC_TC_GFD,
"VolumeCondition": THOST_FTDC_VC_$,
"MinVolume": 1
}
if req.type == OrderType.FAK:
ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
ctp_req["VolumeCondition"] = THOST_FTDC_VC_$
elif req.type == OrderType.FOK:
ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV
self.reqid += 1
ret_val = self.reqOrderInsert(ctp_req, self.reqid)
if ret_val == 0: # hxxjava change
# 报单成功
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
order: OrderData = req.create_order_data(orderid, self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
if ret_val in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.send_order,once=True,param={'req':req}))
return ""
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
frontid, sessionid, order_ref = req.orderid.split("_")
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"OrderRef": order_ref,
"FrontID": int(frontid),
"SessionID": int(sessionid),
"ActionFlag": THOST_FTDC_AF_Delete,
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
if self.reqOrderAction(ctp_req, self.reqid) in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.cancel_order,once=True,param={'req':req}))
def send_rfq(self, req: OrderRequest) -> str:
"""询价请求"""
self.order_ref += 1
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"ForQuoteRef": str(self.order_ref),
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
ret_val = self.reqForQuoteInsert(ctp_req, self.reqid)
if ret_val == 0: # hxxjava change
# 成功执行了询价报单
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
vt_orderid: str = f"{self.gateway_name}.{orderid}"
return vt_orderid
if ret_val in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.send_rfq,once=True,param={'req':req}))
return ""
def query_account(self) -> int:
"""查询资金"""
if not self.connect_status:
return -4
# self.reqid += 1
# self.reqQryTradingAccount({}, self.reqid)
return self.execute_func(func=self.reqQryTradingAccount,req={},prompt="查询资金账户")
def query_position(self) -> int:
"""查询持仓"""
if not self.connect_status:
return -4
if not symbol_contract_map:
return -5
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
# self.reqid += 1
# self.reqQryInvestorPosition(ctp_req, self.reqid)
return self.execute_func(func=self.reqQryInvestorPosition,req=ctp_req,prompt="查询投资者持仓")
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def adjust_price(price: float) -> float:
"""将异常的浮点数最大值(MAX_FLOAT)数据调整为0"""
if price == MAX_FLOAT:
price = 0
return price
该文件中的LocalOrderManager保持不动,其它内容修改如下:
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from copy import copy
from vnpy.event import Event, EventEngine
from .event import (
EVENT_ORIGIN_TICK,
EVENT_ORDER,
EVENT_TRADE,
EVENT_POSITION,
EVENT_ACCOUNT,
EVENT_CONTRACT,
EVENT_STATUS, # hxxjava debug
EVENT_EXCHANGE_MARGIN_RATE, # hxxjava debug
EVENT_EXCHANGE_MARGIN_RATE_ADJUST, # hxxjava debug
EVENT_INSTRUMENT_MARGIN_RATE, # hxxjava add
EVENT_COMMISSION, # hxxjava add
EVENT_ORDER_COMMISSION, # hxxjava add
EVENT_EXCHANGE_MARGIN_RATE_END, # hxxjava debug
EVENT_CONTRACT_END, # hxxjava debug
EVENT_STATUS_END, # hxxjava debug
EVENT_BROKER_TRADING_PARAMS, # hxxjava add
EVENT_QUERY_INVESTOR, # hxxjava add
EVENT_QUERY_PRODUCT, # hxxjava add
EVENT_CONNECT, # hxxjava add
EVENT_DISCONNECT, # hxxjava add
EVENT_LOG,
EVENT_QUOTE,
)
from .object import (
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData,
MarginRequest, # hxxjava debug 2021-9-16
MarginRateAdjustRequest, # hxxjava debug 2021-10-13
CommissionRequest, # hxxjava debug 2021-9-16
OrderCommRateRequest, # hxxjava debug 2021-9-16
BrokerTradingParamsRequest, # hxxjava debug 2021-9-22
ProductRequst, # hxxjava debug
StatusData, # hxxjava debug
MarginData, # hxxjava debug
MarginRateAdjustData, # hxxjava add
InstrumentMarginData, # hxxjava add
CommissionData, # hxxjava add
OrderCommRateData, # hxxjava add
BrokerTradingParamsData, # hxxjava add
InvestorData, # hxxjava add
ProductData, # hxxjava add
GatewayData, # hxxjava add
LogData,
QuoteData,
OrderRequest,
CancelRequest,
SubscribeRequest,
HistoryRequest,
QuoteRequest,
Exchange,
BarData
)
from enum import Enum
class Function(): # hxxjava add
"""
Function and it's parameters description.
"""
func: Callable = None
param: dict = {}
once: bool = False
maxcount:int = 1
def __init__(
self,
func:Callable,
param:Dict={},
once: bool=False,
maxcount:int = 1
) -> None:
""" """
self.func = func
self.param = param
self.once = once
self.maxcount = maxcount
self.count = 0
def exec(self):
""" """
self.count += 1
if not self.once:
# 如果时循环执行的
if self.count >= self.maxcount:
# 超循环计数执行一次
self.count = 0
return self.func(**self.param)
else:
return -5
else:
# 单次执行的
return self.func(**self.param)
def to_str(self):
return f"Function(func:{self.func},param:{self.param},once:{self.once},maxcount:{self.maxcount},count:{self.count})"
class BaseGateway(ABC):
"""
Abstract gateway class for creating gateways connection
to different trading systems.
# How to implement a gateway:
---
## Basics
A gateway should satisfies:
* this class should be thread-safe:
* all methods should be thread-safe
* no mutable shared properties between objects.
* all methods should be non-blocked
* satisfies all requirements written in docstring for every method and callbacks.
* automatically reconnect if connection lost.
---
## methods must implements:
all @abstractmethod
---
## callbacks must response manually:
* on_tick
* on_trade
* on_order
* on_position
* on_account
* on_contract
All the XxxData passed to callback should be constant, which means that
the object should not be modified after passing to on_xxxx.
So if you use a cache to store reference of data, use copy.copy to create a new object
before passing that data into on_xxxx
"""
# Fields required in setting dict for connect function.
default_setting: Dict[str, Any] = {}
# Exchanges supported in the gateway.
exchanges: List[Exchange] = []
def __init__(self, event_engine: EventEngine, gateway_name: str):
""""""
self.event_engine: EventEngine = event_engine
self.gateway_name: str = gateway_name
def on_event(self, type: str, data: Any = None) -> None:
"""
General event push.
"""
event = Event(type, data)
self.event_engine.put(event)
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
self.on_event(EVENT_ORIGIN_TICK, tick)
def on_trade(self, trade: TradeData) -> None:
"""
Trade event push.
Trade event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_TRADE, trade)
self.on_event(EVENT_TRADE + trade.vt_symbol, trade)
def on_order(self, order: OrderData) -> None:
"""
Order event push.
Order event of a specific vt_orderid is also pushed.
"""
self.on_event(EVENT_ORDER, order)
self.on_event(EVENT_ORDER + order.vt_orderid, order)
def on_position(self, position: PositionData) -> None:
"""
Position event push.
Position event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_POSITION, position)
self.on_event(EVENT_POSITION + position.vt_symbol, position)
def on_account(self, account: AccountData) -> None:
"""
Account event push.
Account event of a specific vt_accountid is also pushed.
"""
self.on_event(EVENT_ACCOUNT, account)
self.on_event(EVENT_ACCOUNT + account.vt_accountid, account)
def on_quote(self, quote: QuoteData) -> None:
"""
Quote event push.
Quote event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_QUOTE, quote)
self.on_event(EVENT_QUOTE + quote.vt_symbol, quote)
def on_log(self, log: LogData) -> None:
"""
Log event push.
"""
self.on_event(EVENT_LOG, log)
def on_contract(self, contract: ContractData) -> None:
"""
Contract event push.
"""
self.on_event(EVENT_CONTRACT, contract)
def on_contract_end(self, contracts: List[ContractData]) -> None:
"""
Contract event push.
"""
self.on_event(EVENT_CONTRACT_END, contracts)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
def on_exchange_margin_rate(self, margin: MarginData) -> None: # hxxjava add
"""
Exchange margin event push.
"""
self.on_event(EVENT_EXCHANGE_MARGIN_RATE,margin)
def on_exchange_margin_rate_adjust(self, adjust: MarginRateAdjustData) -> None: # hxxjava add
"""
Exchange margin event push.
"""
self.on_event(EVENT_EXCHANGE_MARGIN_RATE_ADJUST,adjust)
def on_instrument_margin_rate(self, margin: InstrumentMarginData) -> None: # hxxjava add
"""
Margin event push.
"""
self.on_event(EVENT_INSTRUMENT_MARGIN_RATE,margin)
def on_commission(self, commission: CommissionData) -> None: # hxxjava add
"""
Commission event push.
"""
self.on_event(EVENT_COMMISSION, commission)
def on_order_commission(self, commission: OrderCommRateData) -> None: # hxxjava add
"""
Commission event push.
"""
self.on_event(EVENT_ORDER_COMMISSION, commission)
def on_status_end(self, stats: List[StatusData]) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS_END, stats)
def on_exchange_margin_rate_end(self, EMRs: List[MarginData]) -> None: # hxxjava add
"""
Exchange margin event push.
"""
self.on_event(EVENT_EXCHANGE_MARGIN_RATE_END,EMRs)
def on_broker_trading_params(self, broker_trading_params: BrokerTradingParamsData) -> None: # hxxjava add
"""
Broker trading param event push.
"""
self.on_event(EVENT_BROKER_TRADING_PARAMS, broker_trading_params)
def on_query_investor(self, investor: InvestorData) -> None: # hxxjava add
"""
Broker trading param event push.
"""
self.on_event(EVENT_QUERY_INVESTOR, investor)
def on_query_product(self,product:ProductData)->None: # hxxjava add
"""
Query product event push.
"""
self.on_event(EVENT_QUERY_PRODUCT,product)
def on_connect(self,gateway:GatewayData) -> None: # hxxjava add
"""
gateway connect enent
"""
self.on_event(EVENT_CONNECT, gateway)
def on_disconnect(self,gateway:GatewayData) -> None: # hxxjava add
"""
gateway disconnect enent
"""
self.on_event(EVENT_DISCONNECT, gateway)
def write_log(self, msg: str) -> None:
"""
Write a log event from gateway.
"""
log = LogData(msg=msg, gateway_name=self.gateway_name)
self.on_log(log)
@abstractmethod
def connect(self, setting: dict) -> None:
"""
Start gateway connection.
to implement this method, you must:
* connect to server if necessary
* log connected if all necessary connection is established
* do the following query and response corresponding on_xxxx and write_log
* contracts : on_contract
* account asset : on_account
* account holding: on_position
* orders of account: on_order
* trades of account: on_trade
* if any of query above is failed, write log.
future plan:
response callback/change status instead of write_log
"""
pass
@abstractmethod
def close(self) -> None:
"""
Close gateway connection.
"""
pass
@abstractmethod
def subscribe(self, req: SubscribeRequest) -> None:
"""
Subscribe tick data update.
"""
pass
@abstractmethod
def send_order(self, req: OrderRequest) -> str:
"""
Send a new order to server.
implementation should finish the tasks blow:
* create an OrderData from req using OrderRequest.create_order_data
* assign a unique(gateway instance scope) id to OrderData.orderid
* send request to server
* if request is sent, OrderData.status should be set to Status.SUBMITTING
* if request is failed to sent, OrderData.status should be set to Status.REJECTED
* response on_order:
* return vt_orderid
:return str vt_orderid for created OrderData
"""
pass
@abstractmethod
def cancel_order(self, req: CancelRequest) -> None:
"""
Cancel an existing order.
implementation should finish the tasks blow:
* send request to server
"""
pass
def send_quote(self, req: QuoteRequest) -> str:
"""
Send a new two-sided quote to server.
implementation should finish the tasks blow:
* create an QuoteData from req using QuoteRequest.create_quote_data
* assign a unique(gateway instance scope) id to QuoteData.quoteid
* send request to server
* if request is sent, QuoteData.status should be set to Status.SUBMITTING
* if request is failed to sent, QuoteData.status should be set to Status.REJECTED
* response on_quote:
* return vt_quoteid
:return str vt_quoteid for created QuoteData
"""
return ""
def cancel_quote(self, req: CancelRequest) -> None:
"""
Cancel an existing quote.
implementation should finish the tasks blow:
* send request to server
"""
pass
@abstractmethod
def query_account(self) -> None:
"""
Query account balance.
"""
pass
@abstractmethod
def query_position(self) -> None:
"""
Query holding positions.
"""
pass
@abstractmethod
def query_exchange_margin_rate(self,req:MarginRequest) -> None:
"""
Query contract's margin fee or ratio. # hxxjava add 2021-9-16
"""
pass
@abstractmethod
def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest) -> None:
"""
Query instrument's margin rate adjust. # hxxjava add 2021-10-13
"""
pass
@abstractmethod
def query_margin_ratio(self,req:MarginRequest) -> None:
"""
Query contract's margin fee or ratio. # hxxjava add 2021-9-16
"""
pass
@abstractmethod
def query_commission(self,req:CommissionRequest) -> None:
"""
Query contract's commission fee or ratio. # hxxjava add 2021-9-16
"""
pass
@abstractmethod
def query_order_commission(self,req:OrderCommRateRequest) -> None:
"""
Query contract's commission fee or ratio. # hxxjava add 2021-9-16
"""
pass
@abstractmethod
def query_broker_trading_params(self,req:BrokerTradingParamsRequest) -> None:
"""
Query user's broker trading params. # hxxjava add 2021-9-22
"""
pass
@abstractmethod
def query_investor(self) -> None:
"""
Query investor . # hxxjava add 2021-10-12
"""
pass
@abstractmethod
def query_product(self,req:ProductRequst) -> None:
"""
Query product . # hxxjava add 2021-10-12
"""
pass
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""
Query bar history data.
"""
pass
def get_default_setting(self) -> Dict[str, Any]:
"""
Return default setting dict.
"""
return self.default_setting
"""
Event type string used in VN Trader.
"""
from vnpy.event import EVENT_TIMER # noqa
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
EVENT_TICK = "eTick."
EVENT_TRADE = "eTrade."
EVENT_ORDER = "eOrder."
EVENT_POSITION = "ePosition."
EVENT_ACCOUNT = "eAccount."
EVENT_QUOTE = "eQuote."
EVENT_CONTRACT = "eContract."
EVENT_STATUS = "eStatus." # hxxjava debug
EVENT_LOG = "eLog"
EVENT_CONNECT = "eConnected" # hxxjava add
EVENT_DISCONNECT = "eDisconnected" # hxxjava add
EVENT_INSTRUMENT_MARGIN_RATE = "eInstrumentMarginRate." # hxxjava add
EVENT_EXCHANGE_MARGIN_RATE = "eExchaneMarginRate." # hxxjava add
EVENT_EXCHANGE_MARGIN_RATE_ADJUST = "eExchaneMarginRateAdjust." # hxxjava add
EVENT_COMMISSION = "eCommission." # hxxjava add
EVENT_ORDER_COMMISSION = "eOrderCommission." # hxxjava add
EVENT_CONTRACT_END = "eContractEnd." # hxxjava debug
EVENT_STATUS_END = "eStatusEnd." # hxxjava debug
EVENT_EXCHANGE_MARGIN_RATE_END = "eExchaneMarginRateEnd." # hxxjava add
EVENT_BROKER_TRADING_PARAMS = "eBrokerTradingParams." # hxxjava add
EVENT_QUERY_INVESTOR = "eQueryInvestor." # hxxjava add
EVENT_QUERY_PRODUCT = "eQueryProduct." # hxxjava add
"""
Basic data structure used for general trading function in VN Trader.
"""
from dataclasses import dataclass
from datetime import datetime,date # hxxjava add date
from logging import INFO
from .constant import AlgorithmType, Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType
from .constant import HedgeType,InstrumentStatus,MarginPriceType,AlgorithmType,IncludeCloseProfitType,Currency,OptionRoyaltyPriceType,IdCardType,ProductClass # hxxjava add
ACTIVE_STATUSES = set([Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED])
@dataclass
class BaseData:
"""
Any data object needs a gateway_name as source
and should inherit base data.
"""
gateway_name: str
@dataclass
class TickData(BaseData):
"""
Tick data contains information about:
* last trade in market
* orderbook snapshot
* intraday market statistics.
"""
symbol: str
exchange: Exchange
datetime: datetime
name: str = ""
volume: float = 0
turnover: float = 0
open_interest: float = 0
last_price: float = 0
last_volume: float = 0
limit_up: float = 0
limit_down: float = 0
open_price: float = 0
high_price: float = 0
low_price: float = 0
pre_close: float = 0
bid_price_1: float = 0
bid_price_2: float = 0
bid_price_3: float = 0
bid_price_4: float = 0
bid_price_5: float = 0
ask_price_1: float = 0
ask_price_2: float = 0
ask_price_3: float = 0
ask_price_4: float = 0
ask_price_5: float = 0
bid_volume_1: float = 0
bid_volume_2: float = 0
bid_volume_3: float = 0
bid_volume_4: float = 0
bid_volume_5: float = 0
ask_volume_1: float = 0
ask_volume_2: float = 0
ask_volume_3: float = 0
ask_volume_4: float = 0
ask_volume_5: float = 0
localtime: datetime = None
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class BarData(BaseData):
"""
Candlestick bar data of a certain trading period.
"""
symbol: str
exchange: Exchange
datetime: datetime
interval: Interval = None
volume: float = 0
turnover: float = 0
open_interest: float = 0
open_price: float = 0
high_price: float = 0
low_price: float = 0
close_price: float = 0
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class OrderData(BaseData):
"""
Order data contains information for tracking lastest status
of a specific order.
"""
symbol: str
exchange: Exchange
orderid: str
type: OrderType = OrderType.LIMIT
direction: Direction = None
offset: Offset = Offset.NONE
price: float = 0.0 # hxxjava change
volume: float = 0.0 # hxxjava change
traded: float = 0.0 # hxxjava change
status: Status = Status.SUBMITTING
datetime: datetime = None
tradingday:date = None # hxxjava add
reference:str = "" # hxxjava add
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_orderid = f"{self.gateway_name}.{self.orderid}"
def is_active(self) -> bool:
"""
Check if the order is active.
"""
return self.status in ACTIVE_STATUSES
def create_cancel_request(self) -> "CancelRequest":
"""
Create cancel request object from order.
"""
req = CancelRequest(
orderid=self.orderid, symbol=self.symbol, exchange=self.exchange
)
return req
@dataclass
class TradeData(BaseData):
"""
Trade data contains information of a fill of an order. One order
can have several trade fills.
"""
symbol: str
exchange: Exchange
orderid: str
tradeid: str
direction: Direction = None
offset: Offset = Offset.NONE
price: float = 0.0 # hxxjava change
volume: float = 0.0 # hxxjava change
datetime: datetime = None
tradingday:date = None # hxxjava add
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_orderid = f"{self.gateway_name}.{self.orderid}"
self.vt_tradeid = f"{self.gateway_name}.{self.tradeid}"
@dataclass
class PositionData(BaseData):
"""
Positon data is used for tracking each individual position holding.
"""
symbol: str
exchange: Exchange
direction: Direction
volume: float = 0
frozen: float = 0
price: float = 0
pnl: float = 0
yd_volume: float = 0
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_positionid = f"{self.vt_symbol}.{self.direction.value}"
@dataclass
class AccountData(BaseData):
"""
Account data contains information about balance, frozen and
available.
"""
accountid: str
balance: float = 0
frozen: float = 0
def __post_init__(self):
""""""
self.available = self.balance - self.frozen
self.vt_accountid = f"{self.gateway_name}.{self.accountid}"
@dataclass
class LogData(BaseData):
"""
Log data is used for recording log messages on GUI or in log files.
"""
msg: str
level: int = INFO
def __post_init__(self):
""""""
self.time = datetime.now()
@dataclass
class ContractData(BaseData):
"""
Contract data contains basic information about each contract traded.
"""
symbol: str
exchange: Exchange
name: str
product: Product
size: float
pricetick: float
min_volume: float = 1 # minimum trading volume of the contract
stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None
option_expiry: datetime = None
option_portfolio: str = ""
option_index: str = "" # for identifying options with same strike price
# hxxjava add start
max_market_order_volume: int = 0
min_market_order_volume: int = 0
max_limit_order_volume: int = 0
min_limit_order_volume: int = 0
open_date : str = ""
expire_date : str = ""
is_trading : bool = False
long_margin_ratio:float = 0
short_margin_ratio:float = 0
# hxxjava add end
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def left_alphas(instr:str):
""" get lefe alphas of a string """
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
@dataclass
class MarginData(BaseData): # hxxjava add
"""
Exchange margin rate data for the instrument.
"""
symbol: str
exchange: str
hedge_flag:HedgeType = HedgeType.SEPCULATION
long_margin_rate:float = 0.0
long_margin_perlot:float = 0.0
short_margin_rate:float = 0.0
short_margin_perlot:float = 0.0
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange}"
@dataclass
class MarginRateAdjustData(BaseData): # hxxjava add
"""
Exchange margin rate adjust data for the instrument.
"""
symbol: str = ""
hedge_flag:HedgeType = HedgeType.SEPCULATION
# 跟随保证金(率)
LongMarginRatioByMoney : float = 0.0
LongMarginRatioByVolume : float = 0.0
ShortMarginRatioByMoney : float = 0.0
ShortMarginRatioByVolume : float = 0.0
# 交易所保证金(率)
ExchLongMarginRatioByMoney : float = 0.0
ExchLongMarginRatioByVolume : float = 0.0
ExchShortMarginRatioByMoney : float = 0.0
ExchShortMarginRatioByVolume : float = 0.0
# 不跟随保证金(率)
NoLongMarginRatioByMoney : float = 0.0
NoLongMarginRatioByVolume : float = 0.0
NoShortMarginRatioByMoney : float = 0.0
NoShortMarginRatioByVolume : float = 0.0
@dataclass
class InstrumentMarginData(BaseData): # hxxjava add
"""
Instrument margin rate data for the contract.
"""
symbol: str = ""
exchange: str = ""
hedge_flag:HedgeType = HedgeType.SEPCULATION
long_margin_rate:float = 0.0
long_margin_perlot:float = 0.0
short_margin_rate:float = 0.0
short_margin_perlot:float = 0.0
is_ralative:bool = False
investor_range:str = ""
investor_id:str=""
invest_unit_id:str=""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange}"
@dataclass
class CommissionData(BaseData): # hxxjava add
"""
Commssion rate data for the instrument.
"""
symbol: str
exchange: str = ""
open_ratio_bymoney:float = 0.0
open_ratio_byvolume:float = 0.0
close_ratio_bymoney:float = 0.0
close_ratio_byvolume:float = 0.0
close_today_ratio_bymoney:float=0.0
close_today_ratio_byvolume:float=0.0
investor_range:str = ""
biz_type: str = ""
invest_unit_id: str = ""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange}"
@dataclass
class OrderCommRateData(BaseData): # hxxjava add
"""
Ordering commission rate data for the contract .
"""
symbol:str
exchange:str
investor_range:str
invest_unit_id:str
hedge_flag:HedgeType = HedgeType.SEPCULATION
order_comm_byvolume:float = 0.0
order_action_comm_byvolume:float = 0.0
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange}"
@dataclass
class BrokerTradingParamsData(BaseData): # hxxjava add
"""
Broker trading params data.
"""
margin_price_type:MarginPriceType
algorithm:AlgorithmType
avail_include_close_profit:IncludeCloseProfitType
currency:Currency
option_royalty_price_type:OptionRoyaltyPriceType
account:str
@dataclass
class InvestorData(BaseData): # hxxjava add
"""
Investor information data.
"""
id:str
name:str
broker:str
group:str
identifiedCardType:IdCardType
identifiedCardNo:str
is_active:bool
telephone:str
address:str
open_date:date
mobile:str
commission_model:str
margin_model:str
@dataclass
class ProductData(BaseData): # hxxjava add
"""
Product data.
"""
id:str
name:str
exchange:Exchange
product_class:ProductClass
size:float = 0.0
price_tick:float = 0.0
MaxMarketOrderVolume:float = 0.0
MinMarketOrderVolume:float = 0.0
MaxLimitOrderVolume:float = 0.0
MinLimitOrderVolume:float = 0.0
PositionType:str = ""
PositionDateType:str = ""
CloseDealType:str = ""
TradeCurrencyID:str = ""
MortgageFundUseRange:str = ""
ExchangeProductID:str = ""
UnderlyingMultiple:float = 0.0
@dataclass
class GatewayData(): # hxxjava add
"""
Gateway data
"""
name:str = ""
type:str = "" # 'TD'/'MD'
reason:int = 0
@dataclass
class QuoteData(BaseData):
"""
Quote data contains information for tracking lastest status
of a specific quote.
"""
symbol: str
exchange: Exchange
quoteid: str
bid_price: float = 0.0
bid_volume: int = 0
ask_price: float = 0.0
ask_volume: int = 0
bid_offset: Offset = Offset.NONE
ask_offset: Offset = Offset.NONE
status: Status = Status.SUBMITTING
datetime: datetime = None
reference: str = ""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_quoteid = f"{self.gateway_name}.{self.quoteid}"
def is_active(self) -> bool:
"""
Check if the quote is active.
"""
return self.status in ACTIVE_STATUSES
def create_cancel_request(self) -> "CancelRequest":
"""
Create cancel request object from quote.
"""
req = CancelRequest(
orderid=self.quoteid, symbol=self.symbol, exchange=self.exchange
)
return req
@dataclass
class SubscribeRequest:
"""
Request sending to specific gateway for subscribing tick data update.
"""
symbol: str
exchange: Exchange
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class OrderRequest:
"""
Request sending to specific gateway for creating a new order.
"""
symbol: str
exchange: Exchange
direction: Direction
type: OrderType
volume: float
price: float = 0
offset: Offset = Offset.NONE
reference: str = ""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def create_order_data(self, orderid: str, gateway_name: str) -> OrderData:
"""
Create order data from request.
"""
order = OrderData(
symbol=self.symbol,
exchange=self.exchange,
orderid=orderid,
type=self.type,
direction=self.direction,
offset=self.offset,
price=self.price,
volume=self.volume,
reference=self.reference,
gateway_name=gateway_name,
)
return order
@dataclass
class MarginRequest: # hxxjava add
"""
Request sending to specific margin rate for a contract.
"""
symbol: str
exchange: Exchange = None
hedge_type:HedgeType = HedgeType.SEPCULATION
@dataclass
class MarginRateAdjustRequest: # hxxjava add
"""
Request sending to specific margin rate adjust for an instrument.
"""
symbol: str
hedge_type:HedgeType = HedgeType.SEPCULATION
@dataclass
class CommissionRequest: # hxxjava add
"""
Request sending to specific commission for a contract.
"""
symbol: str = ""
exchange: Exchange = None
invest_unit:str = ""
@dataclass
class OrderCommRateRequest: # hxxjava add
"""
Request sending order commission rate for an instrument.
"""
symbol: str = ""
@dataclass
class BrokerTradingParamsRequest: # hxxjava add
""" Broker trading parameters request. """
CurrencyID:str = "CNY" # default is "CNY"
AccountID:str = "" # acount id
@dataclass
class ProductRequst: # hxxjava add
product:str
product_class:ProductClass
exchange:Exchange = None
@dataclass
class CancelRequest:
"""
Request sending to specific gateway for canceling an existing order.
"""
orderid: str
symbol: str
exchange: Exchange
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class HistoryRequest:
"""
Request sending to specific gateway for querying history data.
"""
symbol: str
exchange: Exchange
start: datetime
end: datetime = None
interval: Interval = None
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class QuoteRequest:
"""
Request sending to specific gateway for creating a new quote.
"""
symbol: str
exchange: Exchange
bid_price: float
bid_volume: int
ask_price: float
ask_volume: int
bid_offset: Offset = Offset.NONE
ask_offset: Offset = Offset.NONE
reference: str = ""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def create_quote_data(self, quoteid: str, gateway_name: str) -> QuoteData:
"""
Create quote data from request.
"""
quote = QuoteData(
symbol=self.symbol,
exchange=self.exchange,
quoteid=quoteid,
bid_price=self.bid_price,
bid_volume=self.bid_volume,
ask_price=self.ask_price,
ask_volume=self.ask_volume,
bid_offset=self.bid_offset,
ask_offset=self.ask_offset,
reference=self.reference,
gateway_name=gateway_name,
)
return quote
vnpytrader wrote:
大佬,StatusEnterReason这个数据类型如何定义的,找遍全网,都没有找到对应的定义,望解答啊,感谢!
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
详细见 报单流控、查询流控和会话数控制,这里不再赘述,有兴趣的读者可以去看看。
让我们看看下面的ctp_gateway的td_api.send_order()代码:
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
# .... 这里省略
self.reqid += 1
self.reqOrderInsert(ctp_req, self.reqid) # 这里有问题
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
order: OrderData = req.create_order_data(orderid, self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
上面代码中self.reqOrderInsert(ctp_req, self.reqid)就没有考虑报单流控,它有4个返回值:
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
其中两个是有关流控的,如果是-2,-3我们是可以采用类似等待、重试、队列等方法的,可是都没有。
这导致的问题是如果是返回了-2,-3,那么,报单是没有成功的,上层app还以为已经成功了,也不会采用什么其它逻辑来补救。
CTP交易接口一定的报单和查询必须符合CTP流控规则,默认FTD报文流控如果是每秒6次,报单和查询都会形成FTD报文,查询流控为1秒1次,报单流控最大是每秒6次,假如当前秒已经有了1次查询,那么报单的流控就只有5次了。
如果报单超越了当前的流控,那么报单坑定是失败的,交易服务器是不会接受报单请求的,无论你是开仓还是平仓,CTP交易接口根本没有成交的可能!
这样就造成我们以为发出去的符合条件成交条件委托是成功的,可是却是石沉大海,没有了下文。
实际的危害可能是:机会来了无法入场,灾难来临无法逃离 !
改造的主要思路是:只要没有违反流控,直接快速执行报单请求和查询请求,如遇违反流控,让CTP网关缓冲报单请求和查询请求,延后执行。
本次分享的ctp_gateway比当前vnpy系统的ctp_gateway具备更为丰富的接口,其中包括:
为了提高接口推送消息的能力,在ctp_gateway登录交易接口时,先连续不停地接收并解码合约状态、交易所保证金和合约信息这三个数量巨大的推送数据,然后分别一次性推送这些数据到系统的消息引擎中。这些消息接收端应该一次性地对这些消息进行处理。
class HedgeType(Enum):
"""
投机/套保/备兑类型 hxxjava add
"""
SEPCULATION = '1' #"投机"
ARBITRAGE = '2' #"套利"
HEDGE = '3' #"套保"
MARKETMAKER = '5' #"做市商"
SPECHEDGE = '6' # 第一腿投机第二腿套保 大商所专用
HEDGESPEC = '7' # 第一腿套保第二腿投机 大商所专用
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
class MarginPriceType(Enum):
"""
保证金价格类型
"""
# 昨结算价
PRE_SETTLEMENT_PRICE = '1'
# 最新价
SETTLEMENT_PRICE = '2'
# 成交均价
$ERAGE_PRICE = '3'
# 开仓价
OPEN_PRICE = '4'
class AlgorithmType(Enum):
"""
盈亏算法类型
"""
# 浮盈浮亏都计算
ALL = '1'
# 浮盈不计,浮亏计
ONLY_LOST ='2'
# 浮盈计,浮亏不计
ONLY_GAIN = '3'
# 浮盈浮亏都不计算
NONE = '4'
class IncludeCloseProfitType(Enum):
"""
是否包含平仓盈利类型
"""
# 包含平仓盈利
INCLUDE = '0'
# 不包含平仓盈利
NOT_INCLUDE = '2'
class OptionRoyaltyPriceType(Enum):
"""
期权权利金价格类型类型
"""
# 昨结算价
PRE_SETTLEMENT_PRICE = '1'
# 开仓价
OPEN_PRICE = '4'
# 最新价与昨结算价较大值
MAX_PRE_SETTLEMENT_PRICE = '5'
class IdCardType(Enum):
""" $类型 """
# 组织机构代码
EID = '0'
# 中国公民$
IDCard = '1'
# $
OfficerIDCard = '2'
# $
PoliceIDCard = '3'
# $
SoldierIDCard = '4'
# 户口簿
HouseholdRegister = '5'
# $
Passport = '6'
# 台胞证
TaiwanCompatriotIDCard = '7'
# 回乡证
HomeComingCard = '8'
# 营业执照号
LicenseNo = '9'
# 税务登记号/当地纳税ID
TaxNo = 'A'
# 港澳居民来往内地通行证
HMMainlandTravelPermit = 'B'
# 台湾居民来往大陆通行证
TwMainlandTravelPermit = 'C'
# $
DrivingLicense = 'D'
# 当地社保ID
SocialID = 'F'
# 当地$
LocalID = 'G'
# 商业登记证
BusinessRegistration = 'H'
# 港澳永久性居民$
HKMCIDCard = 'I'
# 人行开户许可证
AccountsPermits = 'J'
# 外国人永久居留证
FrgPrmtRdCard = 'K'
# 资管产品备案函
CptMngPrdLetter = 'L'
# 统一社会信用代码
UniformSocialCreditCode = 'N'
# 机构成立证明文件
CorporationCertNo = 'O'
# 其他$
OtherCard = 'x'
class ProductClass(Enum):
# 期货
FUTURES = '1'
# 期货期权
OPTIONS = '2'
# 组合
COMBINATION = '3'
# 即期
SPOT = '4'
# 期转现
EFP = '5'
# 现货期权
SPOT_OPTION = '6'
# TAS合约
TAS = '7'
# 金属指数
MI = 'I'
因为本文的内容比较长,一共分为5个帖子,其他内容在下方帖子中 ... ...
jingsiaosing wrote:
对于交易非常不活跃的合约 这个方法是有问题的,比如说没有集合竞价tick。如果只交易活跃合约可以不用care
主要的问题还不在这里,主要是如果不做处理,集合竞价的tick是非常有可能产生交易信号的,
导致策略提前连续竞价时段就报单了,而此时交易接口在这段时间是无法接接受报单的,导致
策略丧失开仓或者平仓的机会。如果经过我的处理,就不会出现上述的问题。
另外:对于交易非常不活跃的合约 这个方法也是没有问题的,比如说没有集合竞价tick,自然
在新的连续竞价状态到来的时候,因为不存在集合竞价tick,所有也不需要进行时间转换,所以
也是没有问题的。