改bug已在2.0.6修复,请更新到最新版本
写个run.py脚本来使用vntrader吧(参考example/vntrader下面的vnpy.py)
可以使用csvlodader这个模块来把csv数据载入到数据库中。
在启动cta_strategy和cta_backtester时会默认初始化RQData,如不使用他,可以把相关初始化的代码给注释掉
请发一下cmd的报错截图/信息
6.3.13是专门做穿透式测试用的版本,期货公司给的信息,你需要填写 账号、密码、期货公司测试用的行情前置和交易前置地址、授权码、和产品号(产品号是自己申请的)
然后在运行vntrader时候,不要同时import ctp和ctptest,只需要import ctptest和连接ctptest进行测试
在cmd上有什么报错信息吗
看文档,cta_strategy.md, 有相关的介绍
看文档,cta_strategy.md, 有相关的介绍
那大概率是你不知道系统运行的vnpy在哪个目录上。
观察一下cmd上的报错信息,一般能找到vnpy的运行目录
行情前置/交易前置/授权码等字段填错了,会触发4097报错
vnstation相当于vnpy的图像化显示工具。
建议使用脚本模式运行vntrader,如python run.py,这样打开GUI界面同时,也可以查看cmd上相关的信息。
run.py文件现在github master分支的example文件里面,可以直接拷贝然后运行
vn.py框架学习群 666359421
vn.py框架交流群 262656087
做了穿透式测试没?
大概率是交易前置、行情前置、授权码等填错了。要联系一下你开户的期货公司客户经理
1) ctp_gateway有相关的函数,命名不同而已,可以仔细阅读一下原代码
2) vnpy是可以判断仓位的多空,今仓昨仓的,如converer.py 就有仓位调整的细致操作,如上期所模式,中金所平今惩罚模式。
首先需要确保有数据,然后load_data才能载入数据
vnstudio是直接把python3.7和vnpy以及相关依赖库打包的,已经不包含anaconda了。
pyqt5可以先卸载,在安装固定版本,如pip install PyQt5==5.10
附上这两个文件代码
2019-6-14_open_interset_sql_only.py
from enum import Enum
from peewee import Database, MySQLDatabase, PostgresqlDatabase, SqliteDatabase
from vnpy.trader.utility import get_file_path
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
class Driver(Enum):
SQLITE = "sqlite"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
MONGODB = "mongodb"
def init(driver: Driver, settings: dict):
init_funcs = {
Driver.SQLITE: init_sqlite,
Driver.MYSQL: init_mysql,
Driver.POSTGRESQL: init_postgresql,
}
assert driver in init_funcs
db = init_funcs[driver](settings)
return db
def init_sqlite(settings: dict):
database = settings["database"]
path = str(get_file_path(database))
db = SqliteDatabase(path)
return db
def init_mysql(settings: dict):
keys = {"database", "user", "password", "host", "port"}
settings = {k: v for k, v in settings.items() if k in keys}
db = MySQLDatabase(**settings)
return db
def init_postgresql(settings: dict):
keys = {"database", "user", "password", "host", "port"}
settings = {k: v for k, v in settings.items() if k in keys}
db = PostgresqlDatabase(**settings)
return db
def fetch_columns(driver: Driver, db: Database, table_name: str):
if driver is Driver.SQLITE:
c = db.execute_sql(f"PRAGMA table_info({table_name});")
res = c.fetchall()
return [i[1] for i in res]
else:
c = db.execute_sql(
f"SELECT column_name "
f"FROM INFORMATION_SCHEMA.COLUMNS "
f"WHERE table_name = '{table_name}';"
)
res = c.fetchall()
return [i[0] for i in res]
def upgrade(driver: Driver, db: Database):
columns = fetch_columns(driver, db, "dbbardata")
if 'open_interest' not in columns:
db.execute_sql("ALTER TABLE dbbardata "
" ADD COLUMN open_interest FLOAT DEFAULT 0;")
columns = fetch_columns(driver, db, "dbtickdata")
if 'open_interest' not in columns:
db.execute_sql("ALTER TABLE dbtickdata "
" ADD COLUMN open_interest FLOAT DEFAULT 0;")
print("upgrade succeed!")
def down_grade(driver: Driver, db: Database):
assert driver is not Driver.SQLITE # sqlite doesn't support drop column
columns = fetch_columns(driver, db, "dbbardata")
if 'open_interest' in columns:
db.execute_sql("ALTER TABLE dbbardata "
" DROP COLUMN open_interest;")
columns = fetch_columns(driver, db, "dbtickdata")
if 'open_interest' in columns:
db.execute_sql("ALTER TABLE dbtickdata "
" DROP COLUMN open_interest;")
print("down grade succeed!")
def main():
from vnpy.trader.setting import get_settings
settings = get_settings("database.")
driver = Driver(settings["driver"])
if driver is not Driver.MONGODB:
db = init(driver, settings=settings)
upgrade(driver, db)
else:
raise NotImplementedError()
if __name__ == '__main__':
main()
2019-6-14_index_sql_only.py
import logging
from enum import Enum
from peewee import Database, MySQLDatabase, PostgresqlDatabase, SqliteDatabase
from vnpy.trader.utility import get_file_path
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
class Driver(Enum):
SQLITE = "sqlite"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
MONGODB = "mongodb"
def init(driver: Driver, settings: dict):
init_funcs = {
Driver.SQLITE: init_sqlite,
Driver.MYSQL: init_mysql,
Driver.POSTGRESQL: init_postgresql,
}
assert driver in init_funcs
db = init_funcs[driver](settings)
return db
def init_sqlite(settings: dict):
database = settings["database"]
path = str(get_file_path(database))
db = SqliteDatabase(path)
return db
def init_mysql(settings: dict):
keys = {"database", "user", "password", "host", "port"}
settings = {k: v for k, v in settings.items() if k in keys}
db = MySQLDatabase(**settings)
return db
def init_postgresql(settings: dict):
keys = {"database", "user", "password", "host", "port"}
settings = {k: v for k, v in settings.items() if k in keys}
db = PostgresqlDatabase(**settings)
return db
def fetch_indexes(driver: Driver, db: Database, table_name: str):
if driver is Driver.SQLITE:
c = db.execute_sql(f"PRAGMA index_list({table_name});")
res = c.fetchall()
res = [i[1] for i in res]
return res
elif driver is Driver.POSTGRESQL:
c = db.execute_sql(
f"SELECT indexname "
f" FROM pg_indexes"
f" WHERE tablename = '{table_name}';"
)
return [i[0] for i in c.fetchall()]
else:
c = db.execute_sql(
f"SELECT DISTINCT INDEX_NAME"
f" FROM INFORMATION_SCHEMA.STATISTICS"
f" WHERE TABLE_NAME = '{table_name}';"
)
return [i[0] for i in c.fetchall()]
def create_index_if_not_exists(driver: Driver, db: Database, table_name: str, index_name: str,
*args):
indexes = fetch_indexes(driver, db, table_name)
if index_name not in indexes:
if driver is Driver.POSTGRESQL:
# psql does'nt supports ``
keys = ",".join([
f'"{key}"'
for key in args
])
db.execute_sql(f'CREATE UNIQUE INDEX "{index_name}" '
f' ON "{table_name}" ({keys});')
else:
# mysql needs ``, others supports ``
keys = ",".join([
f'`{key}`'
for key in args
])
db.execute_sql(f'CREATE UNIQUE INDEX `{index_name}` '
f' ON `{table_name}` ({keys});')
def drop_index_if_exists(driver: Driver, db: Database, table_name: str, index_name: str):
if driver is Driver.MYSQL:
# mysql needs DROP ... ON
indexes = fetch_indexes(driver, db, table_name)
if index_name in indexes:
db.execute_sql(f"DROP INDEX `{index_name}` ON `{table_name}`;")
return
else:
db.execute_sql(f'DROP INDEX IF EXISTS "{index_name}";')
return
def upgrade(driver: Driver, db: Database):
create_index_if_not_exists(driver, db,
'dbbardata',
'dbbardata_symbol_exchange_interval_datetime',
"symbol", "exchange", "interval", "datetime",
)
drop_index_if_exists(driver, db,
'dbbardata',
'dbbardata_datetime_interval_symbol_exchange')
create_index_if_not_exists(driver, db,
'dbtickdata',
'dbtickdata_symbol_exchange_datetime',
"symbol", "exchange", "datetime",
)
drop_index_if_exists(driver, db,
'dbtickdata',
'dbtickdata_datetime_symbol_exchange')
print("upgrade succeed!")
def downgrade(driver: Driver, db: Database):
create_index_if_not_exists(driver, db, 'dbbardata',
'dbbardata_datetime_interval_symbol_exchange',
"datetime", "interval", "symbol", "exchange",
)
drop_index_if_exists(driver, db, "dbbardata", "dbbardata_symbol_exchange_interval_datetime")
create_index_if_not_exists(driver, db, 'dbtickdata',
'dbtickdata_datetime_symbol_exchange',
"datetime", "symbol", "exchange",
)
drop_index_if_exists(driver, db, "dbtickdata", "dbtickdata_symbol_exchange_datetime")
print("downgrade succeed!")
def main():
from vnpy.trader.setting import get_settings
settings = get_settings("database.")
driver = Driver(settings["driver"])
if driver is not Driver.MONGODB:
db = init(driver, settings=settings)
upgrade(driver, db)
else:
raise NotImplementedError()
if __name__ == '__main__':
main()
回测使用指数(IF99)能更好地解决换月跳空的问题
安装了什么?vnstudio2.0.4?