Initial commit
This commit is contained in:
144
bot/core.py
144
bot/core.py
@@ -1,71 +1,113 @@
|
||||
#!/usr/bin/python
|
||||
from . import market_data_crawler, market_data_analyzer, shared_config
|
||||
import time
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import tensorflow as tf
|
||||
import urllib.request, json
|
||||
|
||||
from tornado import gen
|
||||
from tornado.ioloop import IOLoop
|
||||
import tornado.web
|
||||
import json
|
||||
import sys
|
||||
from bot.shared_config import *
|
||||
|
||||
class MainHandler(tornado.web.RequestHandler):
|
||||
@gen.coroutine
|
||||
def post(self):
|
||||
print("POST received from IP {0}".format(self.request.remote_ip))
|
||||
def main():
|
||||
start_time = time.time()
|
||||
|
||||
response = {'error': False, 'msg': "None"}
|
||||
request = json.loads(self.request.body.decode('utf-8'))
|
||||
dump(yellow("Retrieving market data from API"))
|
||||
|
||||
if "token" not in request or request["token"] != "den":
|
||||
response["msg"] = "Wrong token - no access granted"
|
||||
self.write(json.dumps(response))
|
||||
return
|
||||
with urllib.request.urlopen("https://api.kraken.com/0/public/OHLC?pair=ETHUSD&interval=15") as url:
|
||||
data = json.loads(url.read().decode())
|
||||
timestamps = []
|
||||
prices = []
|
||||
volumes = []
|
||||
pricevol = []
|
||||
|
||||
if "command" in request:
|
||||
print("Command received: {0}".format(request["command"]))
|
||||
for set in data["result"]['XETHZUSD'][-601:]:
|
||||
timestamps.append(int(set[0]))
|
||||
prices.append(float(set[4]))
|
||||
volumes.append(float(set[6]))
|
||||
pricevol.append([float(set[4]), float(set[6])])
|
||||
|
||||
if request["command"] == "start_bot":
|
||||
shared_config.run_bot = True
|
||||
elif request["command"] == "stop_bot":
|
||||
shared_config.run_bot = False
|
||||
else:
|
||||
response["msg"] = "Unknown command"
|
||||
dump(green("Retrieved API in {0:.3f}ms sec".format((time.time() - start_time)*100)))
|
||||
dump(yellow("Initialize Tensorflow"))
|
||||
|
||||
self.write(json.dumps(response))
|
||||
f_horizon = 1 # forecast horizon, one period into the future
|
||||
num_periods = 20 # number of periods per vector we are using to predict one period ahead
|
||||
inputs = 2 # number of vectors submitted
|
||||
hidden = 100 # number of neurons we will recursively work through, can be changed to improve accuracy
|
||||
output = 1 # number of output vectors
|
||||
|
||||
@gen.coroutine
|
||||
def delete(self):
|
||||
print("Stopping server...")
|
||||
TS = np.array(pricevol)
|
||||
TSo = np.array(prices)
|
||||
|
||||
response_json = json.dumps({'error': False, 'msg': "Server stopped"})
|
||||
self.write(response_json)
|
||||
x_data = TS[:(len(TS) - (len(TS) % num_periods))]
|
||||
x_batches = x_data.reshape(-1, 20, 2)
|
||||
|
||||
IOLoop.instance().stop()
|
||||
y_data = TSo[1:(len(TSo) - (len(TSo) % num_periods)) + f_horizon]
|
||||
y_batches = y_data.reshape(-1, 20, 1)
|
||||
|
||||
def test_data(forecast, num_periods):
|
||||
test_x_setup = TS[-(num_periods + forecast):]
|
||||
testX = test_x_setup[:num_periods].reshape(-1, 20, 2)
|
||||
testY = TSo[-(num_periods):].reshape(-1, 20, 1)
|
||||
return testX, testY
|
||||
|
||||
class Application(tornado.web.Application):
|
||||
def __init__(self):
|
||||
handlers = [
|
||||
(r"/?", MainHandler)
|
||||
]
|
||||
tornado.web.Application.__init__(self, handlers)
|
||||
X_test, Y_test = test_data(f_horizon, num_periods)
|
||||
|
||||
tf.reset_default_graph() # We didn't have any previous graph objects running, but this would reset the graphs
|
||||
|
||||
@gen.coroutine
|
||||
def run_bot():
|
||||
while True:
|
||||
yield gen.sleep(30)
|
||||
if shared_config.run_bot:
|
||||
market_data_analyzer.calculate_arbitrage_opportunities(['kraken', 'bitfinex', 'binance', 'hitbtc', 'gdax', 'bittrex', 'poloniex'])
|
||||
X = tf.placeholder(tf.float32, [None, num_periods, inputs]) # create variable objects
|
||||
y = tf.placeholder(tf.float32, [None, num_periods, output])
|
||||
|
||||
basic_cell = tf.contrib.rnn.BasicRNNCell(num_units=hidden, activation=tf.nn.relu) # create our RNN object
|
||||
rnn_output, states = tf.nn.dynamic_rnn(basic_cell, X, dtype=tf.float32) # choose dynamic over static
|
||||
|
||||
def main(port):
|
||||
app = Application()
|
||||
app.listen(port)
|
||||
run_bot()
|
||||
IOLoop.instance().start()
|
||||
learning_rate = 0.001 # small learning rate so we don't overshoot the minimum
|
||||
|
||||
stacked_rnn_output = tf.reshape(rnn_output, [-1, hidden]) # change the form into a tensor
|
||||
stacked_outputs = tf.layers.dense(stacked_rnn_output, output) # specify the type of layer (dense)
|
||||
outputs = tf.reshape(stacked_outputs, [-1, num_periods, output]) # shape of results
|
||||
|
||||
loss = tf.reduce_sum(tf.square(outputs - y)) # define the cost function which evaluates the quality of our model
|
||||
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) # gradient descent method
|
||||
training_op = optimizer.minimize(
|
||||
loss) # train the result of the application of the cost_function
|
||||
|
||||
init = tf.global_variables_initializer() # initialize all the variables
|
||||
|
||||
epochs = 1000 # number of iterations or training cycles, includes both the FeedFoward and Backpropogation
|
||||
|
||||
with tf.Session() as sess:
|
||||
init.run()
|
||||
dump(green("Initialized Tensorflow in {0:.3f}ms sec".format((time.time() - start_time) * 100)))
|
||||
dump(yellow("Start Training"))
|
||||
|
||||
for ep in range(epochs):
|
||||
sess.run(training_op, feed_dict={X: x_batches, y: y_batches})
|
||||
if ep % 100 == 0:
|
||||
mse = loss.eval(feed_dict={X: x_batches, y: y_batches})
|
||||
print(ep, "\tMSE:", mse)
|
||||
|
||||
dump(green("Finished training in {0:.3f}ms sec".format((time.time() - start_time) * 100)))
|
||||
|
||||
dump(yellow("Start Predicting"))
|
||||
y_pred = sess.run(outputs, feed_dict={X: X_test})
|
||||
dump(green("Prediction finished in {0:.3f}ms sec".format((time.time() - start_time) * 100)))
|
||||
|
||||
dump(yellow("Start Plotting and output"))
|
||||
|
||||
actual_series = pd.Series(np.concatenate([np.ravel(X_test)[::2],np.ravel(Y_test)]))
|
||||
actual_prediction = pd.Series(np.concatenate([np.ravel(X_test)[::2],np.ravel(y_pred)]))
|
||||
|
||||
plt.title("Forecast vs Actual", fontsize=14)
|
||||
plt.plot(actual_series, "b-", markersize=10, label="Actual")
|
||||
# plt.plot(pd.Series(np.ravel(Y_test)), "w*", markersize=10)
|
||||
plt.plot(actual_prediction, "r-", markersize=7, label="Forecast")
|
||||
plt.legend(loc="upper left")
|
||||
plt.xlabel("Time Periods")
|
||||
|
||||
dump(green("Finished complete program in {0:.3f}ms sec".format((time.time() - start_time) * 100)))
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
port = int(sys.argv[1])
|
||||
print("Starting arbitrage bot on port {0}...".format(port))
|
||||
main(port)
|
||||
print("Starting prediction ...")
|
||||
main()
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
from . import market_data_crawler
|
||||
from bot.shared_config import *
|
||||
|
||||
import time
|
||||
import sys
|
||||
import operator
|
||||
import pprint
|
||||
|
||||
|
||||
def calculate_arbitrage_opportunities(exchanges):
|
||||
start_time = time.time()
|
||||
market_data = market_data_crawler.update_market_data_for_symbol_and_exchange(exchanges)
|
||||
sorted_market_data = {}
|
||||
|
||||
for exchange_name, order_books in market_data.items():
|
||||
for order_book in order_books:
|
||||
symbol = order_book['symbol']
|
||||
new_dictionary = {symbol:
|
||||
{exchange_name:
|
||||
{"bids": order_book['bids'][:5],
|
||||
"asks": order_book['asks'][:5],
|
||||
"timestamp": order_book['timestamp']}}}
|
||||
if symbol not in sorted_market_data.keys():
|
||||
sorted_market_data.update(new_dictionary)
|
||||
else:
|
||||
sorted_market_data[symbol].update(new_dictionary[symbol])
|
||||
|
||||
dump(green(str(len(sorted_market_data))), "possible symbols found in total:", ' '.join(sorted_market_data.keys()))
|
||||
|
||||
market_opport = {}
|
||||
for symbol, exchanges in sorted_market_data.items():
|
||||
lowest_ask = None
|
||||
highest_bid = None
|
||||
market_opport.update({symbol: {}})
|
||||
for exchange_name, order_book in exchanges.items():
|
||||
if lowest_ask is None or lowest_ask['value'] < order_book['asks'][0]:
|
||||
lowest_ask = {"exchange_name":exchange_name,
|
||||
"value":order_book['asks'][0],
|
||||
"order_book": order_book['asks'][:3]}
|
||||
|
||||
if highest_bid is None or highest_bid['value'] > order_book['bids'][0]:
|
||||
highest_bid = {"exchange_name": exchange_name,
|
||||
"value": order_book['bids'][0],
|
||||
"order_book": order_book['bids'][:3]}
|
||||
|
||||
spread = float(highest_bid['value'][0]) - float(lowest_ask['value'][0])
|
||||
|
||||
market_opport[symbol].update({"highest_bid": highest_bid,
|
||||
"lowest_ask": lowest_ask,
|
||||
"spread": spread,
|
||||
"spread_perc": round((spread / float(highest_bid['value'][0])) * 100, 2),
|
||||
"symbol": symbol})
|
||||
|
||||
if spread > 0:
|
||||
with open("market_opportunity_found.txt", "a") as file:
|
||||
file.write("\n+n--- Arbitrage oppportunity found! ---\n\n")
|
||||
pprint.pprint(market_opport[symbol], stream=file)
|
||||
|
||||
sorted_list = sorted(market_opport.values(), key=operator.itemgetter("spread_perc"), reverse=True)
|
||||
|
||||
with open("market_analyzation.txt", "w") as file:
|
||||
pprint.pprint(sorted_list, stream=file)
|
||||
|
||||
print("--- Arbitrage oportunities calculated in {0:.3f}ms ---".format((time.time() - start_time)*100))
|
||||
|
||||
return market_opport
|
||||
|
||||
if __name__ == '__main__':
|
||||
calculate_arbitrage_opportunities(sys.argv[1:])
|
||||
@@ -1,159 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
from collections import defaultdict
|
||||
from bot.shared_config import *
|
||||
|
||||
import sys
|
||||
import time
|
||||
import asyncio
|
||||
import ccxt.async as ccxt
|
||||
|
||||
market_data = defaultdict(list)
|
||||
|
||||
|
||||
def update_market_data_for_symbol_and_exchange(exchanges):
|
||||
if len(exchanges) > 1:
|
||||
start_time = time.time()
|
||||
ids = list(exchanges)
|
||||
exchanges = {}
|
||||
|
||||
for id in ids:
|
||||
# instantiate the exchange by id
|
||||
exchange = getattr(ccxt, id)()
|
||||
|
||||
# save it in a dictionary under its id for future use
|
||||
exchanges[id] = exchange
|
||||
|
||||
dump(yellow('Loading'), 'market data for following exchanges:', ' '.join(ids))
|
||||
exchanges = fetch_all_markets(exchanges)
|
||||
|
||||
allSymbols = [symbol for id in ids for symbol in exchanges[id].symbols]
|
||||
|
||||
# get all unique symbols
|
||||
uniqueSymbols = list(set(allSymbols))
|
||||
|
||||
# filter out symbols that are not present on at least two exchanges
|
||||
arbitrableSymbols = sorted([symbol for symbol in uniqueSymbols if allSymbols.count(symbol) > 1])
|
||||
|
||||
# filter out symbols which have a different basecoin
|
||||
arbitrableSymbols = sorted([symbol for symbol in arbitrableSymbols if '/'+basecoin in symbol])
|
||||
|
||||
dump(yellow('Loading'), 'order books for following exchanges:', ' '.join(ids))
|
||||
exchanges = fetch_all_order_books(exchanges, arbitrableSymbols)
|
||||
|
||||
dump(green('Finished!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time) * 100)))
|
||||
|
||||
with open("market_data.txt", "w") as file:
|
||||
for exchange_name, order_books in market_data.items():
|
||||
file.write("\nMarket: {}".format(exchange_name))
|
||||
|
||||
for order_book in order_books:
|
||||
file.write("\n Order Book: {0}".format(order_book))
|
||||
|
||||
return market_data
|
||||
else:
|
||||
dump(red("Invalid number of arguments given"))
|
||||
return None
|
||||
|
||||
|
||||
def fetch_all_order_books(exchanges, arbitrableSymbols):
|
||||
ob_start_time = time.time()
|
||||
|
||||
async def fetch_single_order_books(exchange, arbitrableSymbols):
|
||||
order_books = []
|
||||
available_symbols = (symbol for symbol in arbitrableSymbols if symbol in exchange.symbols)
|
||||
|
||||
for symbol in available_symbols:
|
||||
# basic round-robin proxy scheduler
|
||||
currentProxy = -1
|
||||
maxRetries = len(proxies)
|
||||
|
||||
for numRetries in range(0, maxRetries):
|
||||
# try proxies in round-robin fashion
|
||||
currentProxy = (currentProxy + 1) % len(proxies)
|
||||
|
||||
try: # try to load exchange markets using current proxy
|
||||
|
||||
tmp_order_book = await exchange.fetch_order_book(symbol)
|
||||
tmp_order_book['symbol'] = symbol
|
||||
order_books.append(tmp_order_book)
|
||||
break
|
||||
|
||||
except ccxt.DDoSProtection as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
await asyncio.sleep(exchange.rateLimit / 500)
|
||||
except ccxt.RequestTimeout as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.AuthenticationError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.ExchangeNotAvailable as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.ExchangeError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.NetworkError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except Exception as e: # reraise all other exceptions
|
||||
raise
|
||||
|
||||
dump(' ', green(exchange.id), 'loaded', green(str(len(order_books))), 'order books in', red("{:.2f}ms".format((time.time() - ob_start_time) * 100)))
|
||||
market_data[exchange.id] = order_books
|
||||
|
||||
async_executor = []
|
||||
for exchange_name, exchange in exchanges.items():
|
||||
# add future to list
|
||||
async_executor.append(asyncio.ensure_future(fetch_single_order_books(exchange, arbitrableSymbols)))
|
||||
|
||||
# wait till all futures in list completed
|
||||
asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor))
|
||||
|
||||
return exchanges
|
||||
|
||||
|
||||
def fetch_all_markets(exchanges):
|
||||
start_time_markets = time.time()
|
||||
|
||||
async def fetch_single_market(exchange):
|
||||
# basic round-robin proxy scheduler
|
||||
currentProxy = -1
|
||||
maxRetries = len(proxies)
|
||||
|
||||
for numRetries in range(0, maxRetries):
|
||||
# try proxies in round-robin fashion
|
||||
currentProxy = (currentProxy + 1) % len(proxies)
|
||||
|
||||
try: # try to load exchange markets using current proxy
|
||||
exchange.proxy = proxies[currentProxy]
|
||||
await exchange.load_markets()
|
||||
break
|
||||
|
||||
except ccxt.DDoSProtection as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.RequestTimeout as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.AuthenticationError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.ExchangeNotAvailable as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.ExchangeError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except ccxt.NetworkError as e:
|
||||
dump(yellow(type(e).__name__), e.args)
|
||||
except Exception as e: # reraise all other exceptions
|
||||
raise
|
||||
|
||||
dump(' ', green(exchange.id), 'loaded', green(str(len(exchange.symbols))), 'markets')
|
||||
|
||||
async_executor = []
|
||||
for exchange_name, exchange in exchanges.items():
|
||||
# add future to list
|
||||
async_executor.append(asyncio.ensure_future(fetch_single_market(exchange)))
|
||||
|
||||
# wait till all futures in list completed
|
||||
asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor))
|
||||
|
||||
dump(green('Loaded all markets!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time_markets) * 100)))
|
||||
|
||||
return exchanges
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
update_market_data_for_symbol_and_exchange(sys.argv[1:])
|
||||
@@ -33,12 +33,3 @@ def underline(s):
|
||||
|
||||
def dump(*args):
|
||||
print(' '.join([str(arg) for arg in args]))
|
||||
|
||||
proxies = [
|
||||
'', # no proxy by default
|
||||
'https://cors-anywhere.herokuapp.com/',
|
||||
]
|
||||
|
||||
basecoin = "ETH"
|
||||
|
||||
run_bot = False
|
||||
Reference in New Issue
Block a user