from vnpy_mysql.mysql_database import MysqlDatabase
from datetime import datetime
from typing import List, Tuple, Dict
import numpy as np
import pyqtgraph as pg
import talib
import copy
import logging
from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BaseData, BarData
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT
from vnpy.usertools.chart_items import SmaItem, BollItem, MacdItem, LineItem, RsiItem
from vnpy_ctabacktester.ui.widget import ConvertBar
# 没有加filename参数时,控制台有输出日志信息,加了filename参数时,控制台不会再输出日志信息
logging.basicConfig(level=logging.DEBUG, # 打印的日志级别
filename='first_test.log', # 日志文件名
filemode='w', # w清空,a追加写入,不会清空
format='%(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'
# 日志格式
)
# logging.debug('debug级别,最低级别,一般开发人员用来打印一些调试信息')
# logging.info('info级别,正常输出信息,一般用来打印一些正常的操作')
# logging.warning('waring级别,一般用来打印警信息')
# logging.error('error级别,一般用来打印一些错误信息')
# logging.critical('critical级别,一般用来打印一些致命的错误信息')
class CLpenItem(CandleItem):
def __init__(self, manager: BarManager):
"""缠论笔"""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.noIncludtionList = []
self.noIncludtionIx = []
self.typingList = []
# 分型不要方向,用type:top 和type:bottom来区分
self.upHighest = 0
self.downLowest = 10000
self.hasStart = False
self.lastTypingIx = 0
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
logging.debug("----------------start----------------ix:" + str(ix))
logging.debug("self.hasStart:" + str(self.hasStart))
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
painter.setPen(self.white_pen)
# 2023-1-21 09:55:52 版本5
# 破无方向局面
# 处理包含关系
if len(self.typingList) > 0:
if self.typingList[-1]["type"] == "top":
direction = "down"
else:
direction = "up"
else:
direction = ""
self.handle_bars_relationship(direction, ix, -1)
if len(self.noIncludtionList) > 2:
# 够了3根直接开始找分型,
# 找到第一个分型,然后连接0bar和这个分型
self.looking_for_typing()
# 画第一笔
if len(self.typingList) == 2 and self.typingList[-2]["type"] != self.typingList[-1]["type"] and not self.hasStart:
if self.typingList[0]["type"] == "bottom":
start_point = QtCore.QPointF(0, self._manager.get_bar(0).high_price)
elif self.typingList[0]["type"] == "top":
start_point = QtCore.QPointF(0, self._manager.get_bar(0).low_price)
endix = self.typingList[0]["ix"]
endprice = self.typingList[0]["price"]
end_point = QtCore.QPointF(endix, endprice)
if start_point and end_point:
painter.drawLine(start_point, end_point)
self.start = {"ix": endix, "price": endprice}
self.hasStart = True
if len(self.typingList) > 2 and self.typingList[-2]["type"] != self.typingList[-1]["type"]:
startix = self.typingList[0]["ix"]
startprice = self.typingList[0]["price"]
start_point = QtCore.QPointF(startix, startprice)
endix = self.typingList[1]["ix"]
endprice = self.typingList[1]["price"]
end_point = QtCore.QPointF(endix, endprice)
if start_point and end_point:
painter.drawLine(start_point, end_point)
self.start = {"ix": endix, "price": endprice}
del self.typingList[0]
painter.end()
return picture
def looking_for_typing(self) -> dict:
oneh = self.noIncludtionList[-3]["high_price"]
onel = self.noIncludtionList[-3]["low_price"]
twoh = self.noIncludtionList[-2]["high_price"]
twol = self.noIncludtionList[-2]["low_price"]
twoix = self.noIncludtionList[-2]["ix"]
threeh = self.noIncludtionList[-1]["high_price"]
threel = self.noIncludtionList[-1]["low_price"]
index = self.get_index_for_ix(twoix)
if len(self.typingList) > 0:
lastIndex = self.get_index_for_ix(self.typingList[-1]["ix"])
else:
lastIndex = 0
diff = index - lastIndex
logging.debug("typing-index:" + str(index))
logging.debug("typing-diff:" + str(diff))
# 顶分型
self.judege_top_typing(oneh, onel, twoh, twol, twoix, threeh, threel, index, diff)
# 底分型
self.judege_bottom_typing(oneh, onel, twoh, twol, twoix, threeh, threel, index, diff)
logging.debug("self.typingList:" + str(self.typingList))
def judege_top_typing(self, oneh: float, onel: float, twoh: float, twol: float, twoix: float, threeh: float, threel: float, index: int, diff: int) -> None:
# 2种情况要分开,即上一个是底分型和上一个是顶分型,处理方式是不同的
if twoh > oneh and twoh > threeh and twol > onel and twol > threel:
logging.debug("self.upHighest before change:" + str(self.upHighest))
logging.debug("twoh:" + str(twoh))
# 在这里要把最高点留存下来
if twoh > self.upHighest:
self.upHighest = twoh
logging.debug("self.upHighest after change:" + str(self.upHighest))
if diff > 3 and twoh >= self.upHighest and (len(self.typingList) == 0 or len(self.typingList) > 0 and self.typingList[-1]["type"] == "bottom"):
self.typingList.append({"price": twoh, "index": index, "diff": diff, "ix": twoix, "type": "top"})
# 添加顶分型重置底极限
self.downLowest = 10000
self.lastTypingIx = twoix
logging.info("appended a new top first and reset self.upHighest")
elif len(self.typingList) > 0 and self.typingList[-1]["type"] == "top" and twoh >= self.upHighest and twoix != self.lastTypingIx:
self.typingList.append({"price": twoh, "index": index, "diff": diff, "ix": twoix, "type": "top"})
self.downLowest = 10000
self.lastTypingIx = twoix
logging.info("appended a new top again")
if len(self.typingList) > 1:
if self.typingList[-2]["price"] > self.typingList[-1]["price"]:
del self.typingList[-1]
self.lastTypingIx = self.typingList[-1]["ix"]
logging.info("delete the last top typing")
else:
del self.typingList[-2]
logging.info("delete the last but one top typing")
def judege_bottom_typing(self, oneh: float, onel: float, twoh: float, twol: float, twoix: float, threeh: float, threel: float, index: int, diff: int) -> None:
if twol < onel and twol < threel and twoh < oneh and twoh < threeh:
logging.debug("self.downLowest before change:" + str(self.downLowest))
logging.debug("twol:" + str(twol))
if twol < self.downLowest:
self.downLowest = twol
logging.debug("self.downLowest after change:" + str(self.downLowest))
logging.debug("diff-before-judge:" + str(diff))
if len(self.typingList) > 0:
logging.debug("self.typingList[-1]:" + str(self.typingList[-1]))
if diff > 3 and twol <= self.downLowest and (len(self.typingList) == 0 or len(self.typingList) > 0 and self.typingList[-1]["type"] == "top"):
self.typingList.append({"price": twol, "index": index, "diff": diff, "ix": twoix, "type": "bottom"})
self.upHighest = 0
self.lastTypingIx = twoix
logging.info("appended a new bottom first and reset self.upHighest")
elif len(self.typingList) > 0 and self.typingList[-1]["type"] == "bottom" and twol <= self.downLowest and twoix != self.lastTypingIx:
self.typingList.append({"price": twol, "index": index, "diff": diff, "ix": twoix, "type": "bottom"})
self.upHighest = 0
self.lastTypingIx = twoix
logging.info("appended-a-new-bottom-again")
logging.debug("self.typingList-after-append:" + str(self.typingList))
if len(self.typingList) > 1:
if self.typingList[-2]["price"] < self.typingList[-1]["price"]:
del self.typingList[-1]
self.lastTypingIx = self.typingList[-1]["ix"]
logging.info("delete the last bottom typing")
else:
del self.typingList[-2]
logging.info("delete the last but one bottom typing")
def get_index_for_ix(self, ix: int) -> int:
index = 0
for x in range(len(self.noIncludtionIx)):
if ix == self.noIncludtionIx[x]:
index = x
return index
def handle_bars_relationship(self, direction: str, nix: int, index: int) -> None:
"""这个函数的作用只要处理包含关系,然后存入无包含关系列表就行了"""
if self.noIncludtionList:
lix = self.noIncludtionList[index]["ix"]
lh = self.noIncludtionList[index]["high_price"]
ll = self.noIncludtionList[index]["low_price"]
else:
lix = 0
lh = self._manager.get_bar(0).high_price
ll = self._manager.get_bar(0).low_price
n = self._manager.get_bar(nix)
nh = n.high_price
nl = n.low_price
if direction == "":
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
if nix == 0:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
pass
# 当前K线包含上一根K线
# 这里必须递归
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"]:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
if direction == "up":
# 有包含关系就得按照方向来处理包含关系
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
pass
# 当前K线包含上一根K线
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"] and self.typingList[x]["price"] < self.upHighest:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
elif direction == "down":
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
pass
# 当前K线包含上一根K线
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"] and self.typingList[x]["price"] > self.downLowest:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
if len(self.noIncludtionList) > 10:
del self.noIncludtionList[0]
if len(self.noIncludtionIx) > 100:
del self.noIncludtionIx[0]
logging.debug("self.noIncludtionList:" + str(self.noIncludtionList))
logging.debug("self.noIncludtionIx:" + str(self.noIncludtionIx))
def get_info_text(self, ix: int) -> str:
"""
Get information text to show by cursor.
"""
bar: BarData = self._manager.get_bar(ix)
if bar:
words: list = [
"ix:",
str(ix)
]
text: str = "\n".join(words)
else:
text: str = ""
return text
class NewChartWidget(ChartWidget):
""""""
MIN_BAR_COUNT = 1000
def __init__(self, parent: QtWidgets.QWidget = None):
""""""
super().__init__(parent)
self.last_price_line: pg.InfiniteLine = None
def add_last_price_line(self):
""""""
plot = list(self._plots.values())[0]
color = (255, 255, 255)
self.last_price_line = pg.InfiniteLine(
angle=0,
movable=False,
label="{value:.1f}",
pen=pg.mkPen(color, width=1),
labelOpts={
"color": color,
"position": 1,
"anchors": [(1, 1), (1, 1)]
}
)
self.last_price_line.label.setFont(NORMAL_FONT)
plot.addItem(self.last_price_line)
def update_history(self, history: List[BarData]) -> None:
"""
Update a list of bar data.
"""
self._manager.update_history(history)
for item in self._items.values():
item.update_history(history)
self._update_plot_limits()
self.move_to_right()
self.update_last_price_line(history[-1])
def update_bar(self, bar: BarData) -> None:
"""
Update single bar data.
"""
self._manager.update_bar(bar)
for item in self._items.values():
item.update_bar(bar)
self._update_plot_limits()
if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
self.move_to_right()
self.update_last_price_line(bar)
def update_last_price_line(self, bar: BarData) -> None:
""""""
if self.last_price_line:
self.last_price_line.setValue(bar.close_price)
if __name__ == "__main__":
app = create_qapp()
# bars = database_manager.load_bar_data(
# "IF888",
# Exchange.CFFEX,
# interval=Interval.MINUTE,
# start=datetime(2019, 7, 1),
# end=datetime(2019, 7, 17)
# )
symbol = "rb2305"
exchange = Exchange.SHFE
interval = Interval.MINUTE
start = datetime(2022, 10, 1)
end = datetime(2022, 11, 15)
dynamic = False # 是否动态演示
n = 10000 # 缓冲K线根数#10000
bars = MysqlDatabase.load_bar_data(
self=None,
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
widget = NewChartWidget()
widget.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")
widget.add_plot("candle", hide_x_axis=True)
# widget.add_plot("volume", maximum_height=150)
# widget.add_plot("rsi", maximum_height=150)
widget.add_plot("macd", maximum_height=150)
widget.add_item(CandleItem, "candle", "candle")
# widget.add_item(VolumeItem, "volume", "volume")
# widget.add_item(LineItem, "line", "candle")
# widget.add_item(SmaItem, "sma", "candle")
widget.add_item(CLpenItem, "clpen", "candle")
# widget.add_item(RsiItem, "rsi", "rsi")
widget.add_item(MacdItem, "macd", "macd")
widget.add_last_price_line()
widget.add_cursor()
if dynamic:
history = bars[:n] # 先取得最早的n根bar作为历史
new_data = bars[n:] # 其它留着演示
else:
history = bars[-n:] # 先取得最新的n根bar作为历史
new_data = [] # 演示的为空
newhistory = ConvertBar(history, 15)
widget.update_history(newhistory)
def update_bar():
if new_data:
bar = new_data.pop(0)
widget.update_bar(bar)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
if dynamic:
timer.start(100)
widget.show()
app.exec()
版本:3.5
时间:2023-1-3 10:34:16
报错:packages\vnpy_ctabacktester\ui\widget.py", line 67, in ConvertBar
newbars[i] = BarData(
IndexError: list assignment index out of range
解决:67行缩进不对。本来,if len(bars)>show_minute*i:这个判断只是为向下取整后如果长度短了一个,就给他加上。但无论是不是加上,后面的代码,newbars=[x for x in range(i)]都应该被执行。
songguannan wrote:
版本:3.5
时间:2022-12-31 15:25:42
帖名:为K线图表添砖加瓦——MACD
链接:https://www.vnpy.com/forum/topic/3776-wei-kxian-tu-biao-tian-zhuan-jia-wa-macd
报错:ImportError: cannot import name 'database_manager' from 'vnpy.trader.database'
解决方法:
3.5已经没有database_manager类,更名为BaseDatabase,而这个类是一个抽象类,不能直接调用,想要调用只能调用实现它的子类,也就是说用了什么数据库就要调用对应数据库的实现,比如mysql,那么引入就要改成 from vnpy_mysql.mysql_database import MysqlDatabase,调用改成
bars = MysqlDatabase.load_bar_data(
报错:missing 1 required positional argument: 'self'
解决:参数要添加self=None
例如:bars = MysqlDatabase.load_bar_data(
self=None,
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
版本:3.5
时间:2022-12-31 15:25:42
帖名:为K线图表添砖加瓦——MACD
链接:https://www.vnpy.com/forum/topic/3776-wei-kxian-tu-biao-tian-zhuan-jia-wa-macd
报错:ImportError: cannot import name 'database_manager' from 'vnpy.trader.database'
解决方法:
3.5已经没有database_manager类,更名为BaseDatabase,而这个类是一个抽象类,不能直接调用,想要调用只能调用实现它的子类,也就是说用了什么数据库就要调用对应数据库的实现,比如mysql,那么引入就要改成 from vnpy_mysql.mysql_database import MysqlDatabase,调用改成
bars = MysqlDatabase.load_bar_data(
版本:3.5
时间:2022-12-30 17:23:04
报错:peewee.OperationalError: (1071, 'Specified key was too long; max key length is 1000 bytes')
分析:可能是已经存在的sqlite数据往mysql里面转的时候某些索引超长了
解决方法:备份文件夹 C:\Users\当前用户.vntrader后,将文件夹 C:\Users\当前用户.vntrader删除,恢复文件connect_ctp.json和vt_setting.json再启动
版本:3.5
时间:2022-12-30 17:00:04
报错:peewee.OperationalError: (1049, "Unknown database 'vnpy'")
解决方法:要先手动创建数据库
版本:3.5
时间:2022-12-30 16:47:32
报错:peewee.OperationalError: (1045, "Access denied for user 'root'@'localhost' (using password: YES)") 2022-12-30 16:35:54 VeighNa Trader进程终止
解决方法:
1.打开文件 C:\Users\当前用户.vntrader\vt_setting.json
2.找到字段"database.password"修改成正确密码即可
扩展:因配置参数错误导致的VeighNa Trader进程无法启动均可按此方法解决