diff --git a/bot/market_data_crawler.py b/bot/market_data_crawler.py index 789cad2..03481d2 100644 --- a/bot/market_data_crawler.py +++ b/bot/market_data_crawler.py @@ -1,141 +1,202 @@ #!/usr/bin/python -import tornado.escape -import tornado.httpclient -import tornado.httpclient from tornado import gen -import time from collections import defaultdict +import sys +import time +import asyncio +import ccxt.async as ccxt + market_data = defaultdict(list) -@gen.coroutine -def update_market_data_for_basecoin(basecoin): - market_requests = [] +def style(s, style): + return style + s + '\033[0m' - @gen.coroutine - def call_market_data(url, response_handler): - start_time5 = time.time() - http_client = tornado.httpclient.AsyncHTTPClient() - response = yield http_client.fetch(url) - print("Data received for url {0}. Responsetime: {1:.2f}ms".format(url[0:15], (time.time() - start_time5)*100)) - return response_handler(response) - """ - Bittrex - """ - def handle_response_bittrex(response): - if response.error: - print("Error: %s" % response.error) - return False +def green(s): + return style(s, '\033[92m') - else: - start_time1 = time.time() - response_data = tornado.escape.json_decode(response.body) - for market in response_data["result"]: - base, target = market["MarketName"].split("-") - if base == basecoin: - market_data[target].append({"Bittrex": market["Last"]}) - elif target == basecoin: - market_data[base].append({"Bittrex": market["Last"]}) +def blue(s): + return style(s, '\033[94m') - print("Handling bitr finished in {:.3f}ms".format((time.time() - start_time1)*100)) - return True - market_requests.append( - call_market_data( - "https://bittrex.com/api/v1.1/public/getmarketsummaries", - handle_response_bittrex)) +def yellow(s): + return style(s, '\033[93m') - """ - Poloniex - """ - def handle_response_poloniex(response): - if response.error: - print("Error: %s" % response.error) - return False - else: - start_time2 = time.time() - response_data = tornado.escape.json_decode(response.body) +def red(s): + return style(s, '\033[91m') - for market in response_data: - base, target = market.split("_") - if base == basecoin: - market_data[target].append({"Poloniex": response_data[market]["last"]}) - elif target == basecoin: - market_data[base].append({"Poloniex": response_data[market]["last"]}) - print("Handling pol finished in {:.3f}ms".format((time.time() - start_time2)*100)) - return True +def pink(s): + return style(s, '\033[95m') - market_requests.append( - call_market_data( - "https://poloniex.com/public?command=returnTicker", - handle_response_poloniex)) - """ - Kraken - """ - def handle_response_kraken(response): - if response.error: - print("Error: %s" % response.error) - return False +def bold(s): + return style(s, '\033[1m') - else: - start_time3 = time.time() - response_data = tornado.escape.json_decode(response.body) - import re - for market in response_data["result"]: - base = "ETH" - target = re.findall('XBT|EOS|GNO|ETC|ICN|REP|MLN', market, re.DOTALL) +def underline(s): + return style(s, '\033[4m') - if target[0] == "XBT": - target[0] = "BTC" - if base == basecoin: - market_data[target[0]].append({"Kraken": response_data["result"][market]["c"][0]}) +def dump(*args): + print(' '.join([str(arg) for arg in args])) - print("Handling kra finished in {:.3f}ms".format((time.time() - start_time3)*100)) - return True +proxies = [ + '', # no proxy by default + 'https://cors-anywhere.herokuapp.com/', - market_requests.append( - call_market_data( - "https://api.kraken.com/0/public/Ticker?pair=ETHXBT,EOSETH,GNOETH,ETCETH,ICNETH,REPETH,MLNETH", - handle_response_kraken)) +] - """ - Bitfinex - """ - def handle_response_bitfinex(response): - if response.error: - print("Error: %s" % response.error) - return False - else: - start_time4 = time.time() - response_data = tornado.escape.json_decode(response.body) - import re +def update_market_data_for_symbol_and_exchange(allowed_symbols, exchanges): + if len(exchanges) > 1: + start_time = time.time() + ids = list(exchanges) + exchanges = {} - for market in response_data: - base = "ETH" - target = re.findall('BTC|IOT|EOS|SAN|OMG|QTM|AVT|ETP|NEO|BCH', market[0], re.DOTALL) + dump(yellow('Loading'), 'market data for following exchanges:', ' '.join(ids)) - if base == basecoin: - market_data[target[0]].append({"Bitfinex": market[7]}) + for id in ids: + # instantiate the exchange by id + exchange = getattr(ccxt, id)() - print("Handling bitf finished in {:.3f}ms".format((time.time() - start_time4)*100)) - return True + # save it in a dictionary under its id for future use + exchanges[id] = exchange - market_requests.append( - call_market_data( - "https://api.bitfinex.com/v2/tickers?symbols=tETHBTC,tIOTETH,tEOSETH,tSANETH,tOMGETH,tQTMETH,tAVTETH,tETPETH,tNEOETH,tBCHETH", - handle_response_bitfinex)) + exchanges = fetch_all_markets(exchanges) - print("--- Retrieve market data now ---") - start_time = time.time() - response_dict = yield market_requests - print("--- Marked data updated in {0:.3f}s. Responses: {1} ---".format(time.time() - start_time, response_dict)) + allSymbols = [symbol for id in ids for symbol in exchanges[id].symbols] + + # get all unique symbols + uniqueSymbols = list(set(allSymbols)) + + # filter out symbols that are not present on at least two exchanges + arbitrableSymbols = sorted([symbol for symbol in uniqueSymbols if allSymbols.count(symbol) > 1]) + + exchanges = fetch_all_order_books(exchanges, arbitrableSymbols) + + dump(green('Finished!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time) * 100))) + + with open("market_data.txt", "w") as file: + for key, value in market_data.items(): + file.write("\nMarket: {}".format(key)) + + for order_book in value: + file.write("\n Order Book: {0}".format(order_book)) + + return market_data + else: + dump(red("Invalid number of arguments given")) + return None + + +def fetch_all_order_books(exchanges, arbitrableSymbols): + ob_start_time = time.time() + + async def fetch_single_order_books(exchange, arbitrableSymbols): + dump(yellow('Retrieving'), 'order books from exchange', yellow(exchange.id)) + + order_books = [] + available_symbols = (symbol for symbol in arbitrableSymbols if symbol in exchange.symbols) + + for symbol in available_symbols: + # basic round-robin proxy scheduler + currentProxy = -1 + maxRetries = len(proxies) + + for numRetries in range(0, maxRetries): + # try proxies in round-robin fashion + currentProxy = (currentProxy + 1) % len(proxies) + + try: # try to load exchange markets using current proxy + + tmp_order_book = await exchange.fetch_order_book(symbol) + tmp_order_book['symbol'] = symbol + order_books.append(tmp_order_book) + break + + except ccxt.DDoSProtection as e: + dump(yellow(type(e).__name__), e.args) + await asyncio.sleep(exchange.rateLimit / 500) + except ccxt.RequestTimeout as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.AuthenticationError as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.ExchangeNotAvailable as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.ExchangeError as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.NetworkError as e: + dump(yellow(type(e).__name__), e.args) + except Exception as e: # reraise all other exceptions + raise + + dump("Order book for", yellow(str(exchange.id)), "retrieved in", red("{:.2f}ms".format((time.time() - ob_start_time) * 100))) + market_data[exchange.id] = order_books + + async_executor = [] + for key, value in exchanges.items(): + # add future to list + async_executor.append(asyncio.ensure_future(fetch_single_order_books(exchanges[key], arbitrableSymbols))) + + # wait till all futures in list completed + asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor)) + + return exchanges + + +def fetch_all_markets(exchanges): + start_time_markets = time.time() + + async def fetch_single_market(exchange): + # basic round-robin proxy scheduler + currentProxy = -1 + maxRetries = len(proxies) + + for numRetries in range(0, maxRetries): + # try proxies in round-robin fashion + currentProxy = (currentProxy + 1) % len(proxies) + + try: # try to load exchange markets using current proxy + + exchange.proxy = proxies[currentProxy] + await exchange.load_markets() + break + + except ccxt.DDoSProtection as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.RequestTimeout as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.AuthenticationError as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.ExchangeNotAvailable as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.ExchangeError as e: + dump(yellow(type(e).__name__), e.args) + except ccxt.NetworkError as e: + dump(yellow(type(e).__name__), e.args) + except Exception as e: # reraise all other exceptions + raise + + dump(green(exchange.id), 'loaded', green(str(len(exchange.symbols))), 'markets') + + async_executor = [] + for key, value in exchanges.items(): + # add future to list + async_executor.append(asyncio.ensure_future(fetch_single_market(exchanges[key]))) + + # wait till all futures in list completed + asyncio.get_event_loop().run_until_complete(asyncio.gather(*async_executor)) + + dump(green('Loaded all markets!'), 'Responsetime:', red("{:.2f}ms".format((time.time() - start_time_markets) * 100))) + + return exchanges + + +if __name__ == '__main__': + update_market_data_for_symbol_and_exchange(None, sys.argv[1:]) diff --git a/requirements.txt b/requirements.txt index 7f56a93..4ed9441 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ nose sphinx -tornado \ No newline at end of file +tornado +ccxt \ No newline at end of file