import os, sys import pandas as pd import numpy as np import sqlalchemy import logging import traceback from timeit import default_timer as timer from datetime import datetime, timedelta from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy.orm import sessionmaker from sqlalchemy.types import VARCHAR import yfinance as yf API_KEY = '44ced5e44c50543745b1d89fce8cd93a' api_key = "?apikey=" + API_KEY api_kpi_url = "https://financialmodelingprep.com/api/v3/key-metrics/" api_batch_stock_price_url = "https://financialmodelingprep.com/api/v3/quote/" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), logging.StreamHandler() ] ) Base = declarative_base() data_dir = 'data' file_symbol = os.path.join(data_dir, 'symbols.json') db_symbol = "base_symbol" db_kpi = "base_kpi" db_div = "base_dividend" db_rec = "base_recommendation" db_his = "base_price_history" sync_freq_kpi = 14 sync_freq_rec = 7 sync_freq_div = 21 sync_freq_his = 1 sym_exclude = [] engine = sqlalchemy.create_engine("mysql+pymysql://spcial:GOi1gA01@localhost:3306/app_stockdash?charset=utf8mb4") def update_timestamp(symbol, schema): Session = sessionmaker(bind=engine) session = Session() curr_symbol = session.query(Symbols).filter(Symbols.symbol == symbol) if schema == db_kpi: curr_symbol.update({Symbols.last_updated_kpi: datetime.now()}, synchronize_session=False) elif schema == db_rec: curr_symbol.update({Symbols.last_updated_rec: datetime.now()}, synchronize_session=False) elif schema == db_div: curr_symbol.update({Symbols.last_updated_div: datetime.now()}, synchronize_session=False) elif schema == db_his: curr_symbol.update({Symbols.last_updated_his: datetime.now()}, synchronize_session=False) session.commit() def update_loadable(symbol, loadable): Session = sessionmaker(bind=engine) session = Session() curr_symbol = session.query(Symbols).filter(Symbols.symbol == symbol) curr_symbol.update({Symbols.loadable: loadable}, synchronize_session=False) session.commit() def load_to_db(df, table_name): start = timer() try: df = df.replace([np.inf, -np.inf], np.nan) df.to_sql(table_name, schema='app_stockdash', con=engine, if_exists='append', dtype={'symbol': VARCHAR(10)}) except Exception as err: logging.warning(" <%s> Error occured when loading data to DB. Error: \n%s" % (table_name, err)) raise logging.info(" comleted in %f sec! " % (timer() - start)) def load_from_db(table_name, where=None, limit=None, orderby=None): start = timer() try: sql = "SELECT * FROM %s " % table_name if where is not None: sql = sql + "WHERE %s " % where if orderby is not None: sql = sql + "ORDER BY %s " % orderby if limit is not None: sql = sql + "LIMIT %i " % limit df = pd.read_sql_query(sql, engine) except sqlalchemy.exc.ProgrammingError as er: logging.warning(" <%s> Error occured when quering data. Return None. Error: \n%s" % (table_name, er)) return None logging.info(" comleted in %f sec! " % (timer() - start)) return df def load_symbols(): logging.info("Loading symbols based on file %s" % file_symbol) df_symbols = pd.read_json(file_symbol).drop(columns=['price']) logging.info("Retrieved %i symbols from file" % len(df_symbols)) df_symbols = df_symbols[df_symbols['exchange'].isin(['Nasdaq Global Select', 'NASDAQ Global Market', 'NASDAQ Capital Market'])] logging.info("Using %i symbols after filtering" % len(df_symbols)) df_symbols["initialized"] = datetime.now() df_symbols["last_updated_kpi"] = pd.Timestamp.min df_symbols["last_updated_div"] = pd.Timestamp.min df_symbols["last_updated_rec"] = pd.Timestamp.min df_symbols["last_updated_his"] = pd.Timestamp.min df_symbols["loadable"] = True df_existing_symbols = load_from_db(db_symbol) if df_existing_symbols is not None: logging.info("Retrieved %i symbols from DB" % len(df_existing_symbols)) df_diff = pd.concat([df_symbols, df_existing_symbols]).drop_duplicates(subset=['symbol'], keep=False).set_index('symbol') logging.info("Loading %i new symbols into DB..." % len(df_diff)) if len(df_diff) > 0: load_to_db(df_diff, db_symbol) else: logging.info("Could not retrieve any symbols from DB. Expecting table does not exist. Create table...") load_to_db(df_symbols, db_symbol) return df_symbols def synch_data(): logging.info("Synching data. Loading available symbols from DB...") df_symbols = load_from_db(db_symbol, where='loadable = 1') num_symbols = len(df_symbols['symbol']) logging.info("Loaded %i symbols from DB." % num_symbols) final_kpi_columns = load_from_db(db_kpi, limit=1).columns i = 1 for index, row in df_symbols.iterrows(): try: symbol = row['symbol'] yticker = yf.Ticker(symbol) logging.info("%s/%s Querying data for ticker %s" % (i, num_symbols, symbol)) # KPI if row['last_updated_kpi'] < datetime.today() - timedelta(days=sync_freq_kpi): logging.info(" <%s> Last Updated above Threshold. Loading new KPI data for symbol into DB %s" % (db_kpi, symbol)) ticker_dict = yticker.info for idx, val in ticker_dict.items(): if type(val) != list: ticker_dict[idx] = [val] else: ticker_dict[idx] = [",".join(val)] kpi = pd.DataFrame.from_dict(ticker_dict).set_index('symbol') kpi["date"] = datetime.now() kpi = kpi[kpi.columns.intersection(final_kpi_columns)] load_to_db(kpi, db_kpi) update_timestamp(symbol, db_kpi) else: logging.info(" <%s> Data is up-to-date. Nothing to do." % db_kpi) # DIVIDENDS if row['last_updated_div'] < datetime.today() - timedelta(days=sync_freq_div): logging.info(" <%s> Last Updated above Threshold. Loading new DIVIDENDS data for symbol into DB %s" % (db_div, symbol)) div = yticker.dividends.to_frame().reset_index() div.insert(0, 'symbol', symbol) div = div.set_index('symbol') load_to_db(div, db_div) update_timestamp(symbol, db_div) else: logging.info(" <%s> Data is up-to-date. Nothing to do." % db_div) # RECOMMENDATIONS if row['last_updated_rec'] < datetime.today() - timedelta(days=sync_freq_rec): logging.info(" <%s> Last Updated above Threshold. Loading new RECOMMENDATIONS data for symbol into DB %s" % (db_rec, symbol)) rec = yticker.recommendations if rec is not None: rec = rec.reset_index() rec.insert(0, 'symbol', symbol) rec = rec.set_index('symbol').drop_duplicates(subset=['Date', 'Firm']) load_to_db(rec, db_rec) else: logging.info(" <%s> No recommendation data found for %s" % (db_rec, symbol)) update_timestamp(symbol, db_rec) else: logging.info(" <%s> Data is up-to-date. Nothing to do." % db_rec) # PRICE HISTORY if row['last_updated_his'] < datetime.today() - timedelta(days=sync_freq_his): if row['last_updated_his'].date() != pd.Timestamp.min.to_pydatetime().date(): delta = (row['last_updated_his'] + timedelta(days=1)).strftime("%Y-%m-%d") his = yticker.history(start=delta) logging.info(" <%s> Last Updated above Threshold. Loading new PRICE data for symbol into DB %s since %s" % (db_his, symbol, delta)) else: his = yticker.history(period="max") logging.info(" <%s> Never loaded price data. Loading all available price data for symbol into DB %s " % (db_his, symbol)) if his is not None: his = his.reset_index() his.insert(0, 'symbol', symbol) his = his.set_index('symbol') load_to_db(his, db_his) else: logging.info(" <%s> No price history data found for %s" % (db_rec, symbol)) update_timestamp(symbol, db_his) else: logging.info(" <%s> Data is up-to-date. Nothing to do." % db_his) i += 1 except Exception as er: logging.warning("%s/%s Error occured - skipping this entry. Errormsg: \n%s" % ( i, num_symbols, traceback.print_exception(*sys.exc_info()))) update_loadable(symbol, 0) i += 1 continue class Symbols(Base): __tablename__ = db_symbol index = Column(Integer, primary_key=True) symbol = Column(String) name = Column(String) exchange = Column(String) initialized = Column(String) last_updated_kpi = Column(DateTime) last_updated_div = Column(DateTime) last_updated_rec = Column(DateTime) last_updated_his = Column(DateTime) loadable = Column(Integer) if __name__ == '__main__': load_symbols() synch_data()