Modified data crawler to use ccxt library

This commit is contained in:
Dennis Thiessen
2017-10-18 14:18:06 +02:00
parent 8adbe96dca
commit e6b677ee0e
2 changed files with 171 additions and 109 deletions

View File

@@ -1,141 +1,202 @@
#!/usr/bin/python #!/usr/bin/python
import tornado.escape
import tornado.httpclient
import tornado.httpclient
from tornado import gen from tornado import gen
import time
from collections import defaultdict from collections import defaultdict
import sys
import time
import asyncio
import ccxt.async as ccxt
market_data = defaultdict(list) market_data = defaultdict(list)
@gen.coroutine def style(s, style):
def update_market_data_for_basecoin(basecoin): return style + s + '\033[0m'
market_requests = []
@gen.coroutine
def call_market_data(url, response_handler):
start_time5 = time.time()
http_client = tornado.httpclient.AsyncHTTPClient()
response = yield http_client.fetch(url)
print("Data received for url {0}. Responsetime: {1:.2f}ms".format(url[0:15], (time.time() - start_time5)*100))
return response_handler(response)
""" def green(s):
Bittrex return style(s, '\033[92m')
"""
def handle_response_bittrex(response):
if response.error:
print("Error: %s" % response.error)
return False
else:
start_time1 = time.time()
response_data = tornado.escape.json_decode(response.body)
for market in response_data["result"]: def blue(s):
base, target = market["MarketName"].split("-") return style(s, '\033[94m')
if base == basecoin:
market_data[target].append({"Bittrex": market["Last"]})
elif target == basecoin:
market_data[base].append({"Bittrex": market["Last"]})
print("Handling bitr finished in {:.3f}ms".format((time.time() - start_time1)*100))
return True
market_requests.append( def yellow(s):
call_market_data( return style(s, '\033[93m')
"https://bittrex.com/api/v1.1/public/getmarketsummaries",
handle_response_bittrex))
"""
Poloniex
"""
def handle_response_poloniex(response):
if response.error:
print("Error: %s" % response.error)
return False
else: def red(s):
start_time2 = time.time() return style(s, '\033[91m')
response_data = tornado.escape.json_decode(response.body)
for market in response_data:
base, target = market.split("_")
if base == basecoin:
market_data[target].append({"Poloniex": response_data[market]["last"]})
elif target == basecoin:
market_data[base].append({"Poloniex": response_data[market]["last"]})
print("Handling pol finished in {:.3f}ms".format((time.time() - start_time2)*100)) def pink(s):
return True return style(s, '\033[95m')
market_requests.append(
call_market_data(
"https://poloniex.com/public?command=returnTicker",
handle_response_poloniex))
""" def bold(s):
Kraken return style(s, '\033[1m')
"""
def handle_response_kraken(response):
if response.error:
print("Error: %s" % response.error)
return False
else:
start_time3 = time.time()
response_data = tornado.escape.json_decode(response.body)
import re
for market in response_data["result"]: def underline(s):
base = "ETH" return style(s, '\033[4m')
target = re.findall('XBT|EOS|GNO|ETC|ICN|REP|MLN', market, re.DOTALL)
if target[0] == "XBT":
target[0] = "BTC"
if base == basecoin: def dump(*args):
market_data[target[0]].append({"Kraken": response_data["result"][market]["c"][0]}) print(' '.join([str(arg) for arg in args]))
print("Handling kra finished in {:.3f}ms".format((time.time() - start_time3)*100)) proxies = [
return True '', # no proxy by default
'https://cors-anywhere.herokuapp.com/',
market_requests.append( ]
call_market_data(
"https://api.kraken.com/0/public/Ticker?pair=ETHXBT,EOSETH,GNOETH,ETCETH,ICNETH,REPETH,MLNETH",
handle_response_kraken))
"""
Bitfinex
"""
def handle_response_bitfinex(response):
if response.error:
print("Error: %s" % response.error)
return False
else: def update_market_data_for_symbol_and_exchange(allowed_symbols, exchanges):
start_time4 = time.time() if len(exchanges) > 1:
response_data = tornado.escape.json_decode(response.body)
import re
for market in response_data:
base = "ETH"
target = re.findall('BTC|IOT|EOS|SAN|OMG|QTM|AVT|ETP|NEO|BCH', market[0], re.DOTALL)
if base == basecoin:
market_data[target[0]].append({"Bitfinex": market[7]})
print("Handling bitf finished in {:.3f}ms".format((time.time() - start_time4)*100))
return True
market_requests.append(
call_market_data(
"https://api.bitfinex.com/v2/tickers?symbols=tETHBTC,tIOTETH,tEOSETH,tSANETH,tOMGETH,tQTMETH,tAVTETH,tETPETH,tNEOETH,tBCHETH",
handle_response_bitfinex))
print("--- Retrieve market data now ---")
start_time = time.time() start_time = time.time()
response_dict = yield market_requests ids = list(exchanges)
print("--- Marked data updated in {0:.3f}s. Responses: {1} ---".format(time.time() - start_time, response_dict)) exchanges = {}
dump(yellow('Loading'), 'market data for following exchanges:', ' '.join(ids))
for id in ids:
# instantiate the exchange by id
exchange = getattr(ccxt, id)()
# save it in a dictionary under its id for future use
exchanges[id] = exchange
exchanges = fetch_all_markets(exchanges)
allSymbols = [symbol for id in ids for symbol in exchanges[id].symbols]
# get all unique symbols
uniqueSymbols = list(set(allSymbols))
# filter out symbols that are not present on at least two exchanges
arbitrableSymbols = sorted([symbol for symbol in uniqueSymbols if allSymbols.count(symbol) > 1])
exchanges = fetch_all_order_books(exchanges, arbitrableSymbols)
dump(green('Finished!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time) * 100)))
with open("market_data.txt", "w") as file:
for key, value in market_data.items():
file.write("\nMarket: {}".format(key))
for order_book in value:
file.write("\n Order Book: {0}".format(order_book))
return market_data
else:
dump(red("Invalid number of arguments given"))
return None
def fetch_all_order_books(exchanges, arbitrableSymbols):
ob_start_time = time.time()
async def fetch_single_order_books(exchange, arbitrableSymbols):
dump(yellow('Retrieving'), 'order books from exchange', yellow(exchange.id))
order_books = []
available_symbols = (symbol for symbol in arbitrableSymbols if symbol in exchange.symbols)
for symbol in available_symbols:
# basic round-robin proxy scheduler
currentProxy = -1
maxRetries = len(proxies)
for numRetries in range(0, maxRetries):
# try proxies in round-robin fashion
currentProxy = (currentProxy + 1) % len(proxies)
try: # try to load exchange markets using current proxy
tmp_order_book = await exchange.fetch_order_book(symbol)
tmp_order_book['symbol'] = symbol
order_books.append(tmp_order_book)
break
except ccxt.DDoSProtection as e:
dump(yellow(type(e).__name__), e.args)
await asyncio.sleep(exchange.rateLimit / 500)
except ccxt.RequestTimeout as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.AuthenticationError as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.ExchangeNotAvailable as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.ExchangeError as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.NetworkError as e:
dump(yellow(type(e).__name__), e.args)
except Exception as e: # reraise all other exceptions
raise
dump("Order book for", yellow(str(exchange.id)), "retrieved in", red("{:.2f}ms".format((time.time() - ob_start_time) * 100)))
market_data[exchange.id] = order_books
async_executor = []
for key, value in exchanges.items():
# add future to list
async_executor.append(asyncio.ensure_future(fetch_single_order_books(exchanges[key], arbitrableSymbols)))
# wait till all futures in list completed
asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor))
return exchanges
def fetch_all_markets(exchanges):
start_time_markets = time.time()
async def fetch_single_market(exchange):
# basic round-robin proxy scheduler
currentProxy = -1
maxRetries = len(proxies)
for numRetries in range(0, maxRetries):
# try proxies in round-robin fashion
currentProxy = (currentProxy + 1) % len(proxies)
try: # try to load exchange markets using current proxy
exchange.proxy = proxies[currentProxy]
await exchange.load_markets()
break
except ccxt.DDoSProtection as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.RequestTimeout as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.AuthenticationError as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.ExchangeNotAvailable as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.ExchangeError as e:
dump(yellow(type(e).__name__), e.args)
except ccxt.NetworkError as e:
dump(yellow(type(e).__name__), e.args)
except Exception as e: # reraise all other exceptions
raise
dump(green(exchange.id), 'loaded', green(str(len(exchange.symbols))), 'markets')
async_executor = []
for key, value in exchanges.items():
# add future to list
async_executor.append(asyncio.ensure_future(fetch_single_market(exchanges[key])))
# wait till all futures in list completed
asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor))
dump(green('Loaded all markets!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time_markets) * 100)))
return exchanges
if __name__ == '__main__':
update_market_data_for_symbol_and_exchange(None, sys.argv[1:])

View File

@@ -1,3 +1,4 @@
nose nose
sphinx sphinx
tornado tornado
ccxt