在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:khramkov/MQL5-JSON-API开源软件地址:https://github.com/khramkov/MQL5-JSON-API开源编程语言:MQL5 99.2%开源软件介绍:Metaquotes MQL5 - JSON - APIDevelopment state: stable release version 2.0Tested on macOS Mojave / Windows 10 in Parallels Desktop container. Tested on Manjaro Linux / Windows 10 in VirtualBox Working in production on Debian 10 / Wine 4. Table of Contents
About the ProjectThis project was developed to work as a server for the Backtrader Python trading framework. It is based on ZeroMQ sockets and uses JSON format to communicate. But now it has grown to the independent project. You can use it with any programming language that has ZeroMQ binding. Backtrader Python client is located here: Python Backtrader - Metaquotes MQL5 Thanks to the participation of Gunther Schulz, the project moved to a new level. New features:
In development:
Installation
DocumentationThe script uses seven ZeroMQ sockets:
The idea is to send requests via
Check out the available combinations of
Python 3 API class example: import zmq
class MTraderAPI:
def __init__(self, host=None):
self.HOST = host or 'localhost'
self.SYS_PORT = 15555 # REP/REQ port
self.DATA_PORT = 15556 # PUSH/PULL port
self.LIVE_PORT = 15557 # PUSH/PULL port
self.EVENTS_PORT = 15558 # PUSH/PULL port
self.INDICATOR_DATA_PORT = 15559 # REP/REQ port
self.CHART_DATA_PORT = 15560 # PUSH port
# ZeroMQ timeout in seconds
sys_timeout = 1
data_timeout = 10
# initialise ZMQ context
context = zmq.Context()
# connect to server sockets
try:
self.sys_socket = context.socket(zmq.REQ)
# set port timeout
self.sys_socket.RCVTIMEO = sys_timeout * 1000
self.sys_socket.connect('tcp://{}:{}'.format(self.HOST, self.SYS_PORT))
self.data_socket = context.socket(zmq.PULL)
# set port timeout
self.data_socket.RCVTIMEO = data_timeout * 1000
self.data_socket.connect('tcp://{}:{}'.format(self.HOST, self.DATA_PORT))
self.indicator_data_socket = context.socket(zmq.PULL)
# set port timeout
self.indicator_data_socket.RCVTIMEO = data_timeout * 1000
self.indicator_data_socket.connect(
"tcp://{}:{}".format(self.HOST, self.INDICATOR_DATA_PORT)
)
self.chart_data_socket = context.socket(zmq.PUSH)
# set port timeout
# TODO check if port is listening and error handling
self.chart_data_socket.connect(
"tcp://{}:{}".format(self.HOST, self.CHART_DATA_PORT)
)
except zmq.ZMQError:
raise zmq.ZMQBindError("Binding ports ERROR")
def _send_request(self, data: dict) -> None:
"""Send request to server via ZeroMQ System socket"""
try:
self.sys_socket.send_json(data)
msg = self.sys_socket.recv_string()
# terminal received the request
assert msg == 'OK', 'Something wrong on server side'
except AssertionError as err:
raise zmq.NotDone(err)
except zmq.ZMQError:
raise zmq.NotDone("Sending request ERROR")
def _pull_reply(self):
"""Get reply from server via Data socket with timeout"""
try:
msg = self.data_socket.recv_json()
except zmq.ZMQError:
raise zmq.NotDone('Data socket timeout ERROR')
return msg
def _indicator_pull_reply(self):
"""Get reply from server via Data socket with timeout"""
try:
msg = self.indicator_data_socket.recv_json()
except zmq.ZMQError:
raise zmq.NotDone("Indicator Data socket timeout ERROR")
if self.debug:
print("ZMQ INDICATOR DATA REPLY: ", msg)
return msg
def live_socket(self, context=None):
"""Connect to socket in a ZMQ context"""
try:
context = context or zmq.Context.instance()
socket = context.socket(zmq.PULL)
socket.connect('tcp://{}:{}'.format(self.HOST, self.LIVE_PORT))
except zmq.ZMQError:
raise zmq.ZMQBindError("Live port connection ERROR")
return socket
def streaming_socket(self, context=None):
"""Connect to socket in a ZMQ context"""
try:
context = context or zmq.Context.instance()
socket = context.socket(zmq.PULL)
socket.connect('tcp://{}:{}'.format(self.HOST, self.EVENTS_PORT))
except zmq.ZMQError:
raise zmq.ZMQBindError("Data port connection ERROR")
return socket
def _push_chart_data(self, data: dict) -> None:
"""Send message for chart control to server via ZeroMQ chart data socket"""
try:
if self.debug:
print("ZMQ PUSH CHART DATA: ", data, " -> ", data)
self.chart_data_socket.send_json(data)
except zmq.ZMQError:
raise zmq.NotDone("Sending request ERROR")
def construct_and_send(self, **kwargs) -> dict:
"""Construct a request dictionary from default and send it to server"""
# default dictionary
request = {
"action": None,
"actionType": None,
"symbol": None,
"chartTF": None,
"fromDate": None,
"toDate": None,
"id": None,
"magic": None,
"volume": None,
"price": None,
"stoploss": None,
"takeprofit": None,
"expiration": None,
"deviation": None,
"comment": None,
"chartId": None,
"indicatorChartId": None,
"chartIndicatorSubWindow": None,
"style": None,
}
# update dict values if exist
for key, value in kwargs.items():
if key in request:
request[key] = value
else:
raise KeyError('Unknown key in **kwargs ERROR')
# send dict to server
self._send_request(request)
# return server reply
return self._pull_reply()
def indicator_construct_and_send(self, **kwargs) -> dict:
"""Construct a request dictionary from default and send it to server"""
# default dictionary
request = {
"action": None,
"actionType": None,
"id": None,
"symbol": None,
"chartTF": None,
"fromDate": None,
"toDate": None,
"name": None,
"params": None,
"linecount": None,
}
# update dict values if exist
for key, value in kwargs.items():
if key in request:
request[key] = value
else:
raise KeyError("Unknown key in **kwargs ERROR")
# send dict to server
self._send_request(request)
# return server reply
return self._indicator_pull_reply()
def chart_data_construct_and_send(self, **kwargs) -> dict:
"""Construct a request dictionary from default and send it to server"""
# default dictionary
message = {
"action": None,
"actionType": None,
"chartId": None,
"indicatorChartId": None,
"data": None,
}
# update dict values if exist
for key, value in kwargs.items():
if key in message:
message[key] = value
else:
raise KeyError("Unknown key in **kwargs ERROR")
# send dict to server
self._push_chart_data(message) UsageAll examples will be on Python 3. Lets create an instance of MetaTrader API class: api = MTraderAPI() First of all we should configure the script print(api.construct_and_send(action="CONFIG", symbol="EURUSD", chartTF="M5"))
print(api.construct_and_send(action="CONFIG", symbol="AUDUSD", chartTF="M1"))
... There is also print(api.construct_and_send(action="CONFIG", symbol="EURUSD", chartTF="TICK"))
print(api.construct_and_send(action="CONFIG", symbol="EURUSD", chartTF="M1")) If you want to stop rep = api.construct_and_send(action="RESET")
print(rep) Get information about the trading account. rep = api.construct_and_send(action="ACCOUNT")
print(rep) Get historical data. There are some issues:
rep = api.construct_and_send(action="HISTORY", actionType="DATA", symbol="EURUSD", chartTF="M5", fromDate=1555555555)
print(rep) History data reply example:
Buy market order. rep = api.construct_and_send(action="TRADE", actionType="ORDER_TYPE_BUY", symbol="EURUSD", "volume"=0.1, "stoploss"=1.1, "takeprofit"=1.3)
print(rep) Sell limit order. Remember to switch SL/TP depending on BUY/SELL, or you will get
rep = api.construct_and_send(action="TRADE", actionType="ORDER_TYPE_SELL_LIMIT", symbol="EURUSD", "volume"=0.1, "price"=1.2, "stoploss"=1.3, "takeprofit"=1.1)
print(rep) All pending orders are set to rep = api.construct_and_send(action="TRADE", actionType="ORDER_TYPE_SELL_LIMIT", symbol="EURUSD", "volume"=0.1, "price"=1.2, "expiration"=1560782460)
print(rep) Live data and streaming eventsEvent handler example for import zmq
import threading
api = MTraderAPI()
def _t_livedata():
socket = api.live_socket()
while True:
try:
last_candle = socket.recv_json()
except zmq.ZMQError:
raise zmq.NotDone("Live data ERROR")
print(last_candle)
def _t_streaming_events():
socket = api.streaming_socket()
while True:
try:
trans = socket.recv_json()
request, reply = trans.values()
except zmq.ZMQError:
raise zmq.NotDone("Streaming data ERROR")
print(request)
print(reply)
t = threading.Thread(target=_t_livedata, daemon=True)
t.start()
t = threading.Thread(target=_t_streaming_events, daemon=True)
t.start()
while True:
pass There are only two variants of
If the terminal has lost connection to the market:
When the terminal reconnects to the market, it sends the last closed candle again. So you should update your historical data. Make the
Streaming MT5 indicator dataOpen a chart window and attach a MT5 indicator. Parameters:
print(api.indicator_construct_and_send(action='INDICATOR', actionType='ATTACH', id='4df306ea-e8e6-439b-8004-b86ba4bcc8c3', symbol='EURUSD', chartTF='M1', name='Examples/MACD', 'params'=['12', '26', '9', 'PRICE_CLOSE'], 'linecount'=2)) Stream the calculated result values of a previously attached indicator. Parameters:
print(api.indicator_construct_and_send(action='INDICATOR', actionType='REQUEST', id='4df306ea-e8e6-439b-8004-b86ba4bcc8c3', 'fromDate'=1591993860)) Example of the result: {'error': False, 'id': '4df306ea-e8e6-439b-8004-b86ba4bcc8c3', 'data': ['0.00008204', '0.00001132']} The data field holds a list with results of the calculated indicator buffers. |
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论