Files
ImoBot/main.py
Dennis Thiessen 2ace4c3044 Initial commit
2018-09-12 21:33:44 +02:00

339 lines
14 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import statistics
import sys
import logging
import codecs
import time
import xlsxwriter
import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException, StaleElementReferenceException
href_1 = 'https://en.aruodas.lt/nt_zemelapis/?obj=1&on_map=1&type=map&FSelectedArea=u99znh05tds4%2Cu99znqdmts7u%2Cu99znx4p99ee%2Cu99znvstves2%2Cu99znenb3dhq%2Cu99zne2djsev%2Cu99znkhh1th2%2Cu99zjgkyjwh3%2Cu99zjge8t8h4%2Cu99zjggyvwec%2Cu99znh0y3x5b#zoom:13;center:(54.698048468880216,25.254161356320424)'
href_2 = 'https://en.aruodas.lt/nt_zemelapis/?obj=1&on_map=1&type=map&FSelectedArea=u99znv59jpvx%2Cu99znu91thv0%2Cu99zndzq3ptt%2Cu99znfcyjnt1%2Cu99zp56n9jtn%2Cu99zp5n3ejmw%2Cu99zp71dg4t3%2Cu99zp7mwvnv7%2Cu99zp7tvt5tp%2Cu99zpk5um5me%2Cu99zph7t10tp%2Cu99znvp055j4%2Cu99znv58ejt2#zoom:14;center:(54.68989490798361,25.272123999999963)'
href_3 = 'https://en.aruodas.lt/nt_zemelapis/?obj=1&on_map=1&type=map&FSelectedArea=u99zp75y38fj%2Cu99zp7p5d91g%2Cu99zpe1mqt1s%2Cu99zpe499scs%2Cu99zpdfdwwcu%2Cu99zpddj8t1f%2Cu99zpd6nkt9r%2Cu99zpd1rqe1x%2Cu99zpd4368cz%2Cu99zp9uzfd9d%2Cu99zp9usnd64%2Cu99zp9ejdwcy%2Cu99zp90wud6p%2Cu99zp3pkref4%2Cu99zp3hq1e3s%2Cu99zp38xkw3d%2Cu99zp4nvqd3g%2Cu99zp4wdh96p%2Cu99zp4yvsxcz%2Cu99zp5r8399e%2Cu99zp760ddct%2Cu99zp7hj2t1u#zoom:14;center:(54.68006792227714,25.287418499999944)'
href_4 = 'https://en.aruodas.lt/nt_zemelapis/?obj=1&on_map=1&type=map&FSelectedArea=u99zpdtuqntv%2Cu99zpdt9d4jb%2Cu99zpdd531t6%2Cu99zpd6jdjm3%2Cu99zpd1rtpt9%2Cu99zpd41fnt7%2Cu99zpdj9tjm1%2Cu99zpdnz9pt1%2Cu99zpdrc1jvb%2Cu99zpfhd4pvw%2Cu99zpfpsm0tk%2Cu9dp042mk5vp%2Cu99zpfxvq4m0%2Cu99zpfy904tj%2Cu99zpghgqpte%2Cu99zpg5xf4m1%2Cu99zpgdcd0jh%2Cu99zpgdhs1jr%2Cu99zpg95h5j2%2Cu99zpdyjypvd%2Cu99zpdtg3njr#zoom:15;center:(54.68360773485153,25.302744000000075)'
filename_output = os.path.join('output', 'ImoBot_output.xlsx')
history_filepath = os.path.join('output', 'history.log')
driver_win_filepath = os.path.join('driver', 'chromedriver.exe')
driver_nt_filepath = os.path.join('driver', 'chromedriver')
mailto = ['dennis.thiessen@riskahead.de', 'kevin.gruendel@riskahead.de', 'florianbergel@yahoo.de']
pages = [['Zverynas', href_1], ['Gedimino Prospektas', href_2], ['Old City', href_3], ['Uzupis', href_4]]
#mailto = ['dennis.thiessen@riskahead.de']
#pages = [['Zverynas', href_1]]
class Area:
def __init__(self, name, href, flats_for_rent=None, flats_for_sale=None, houses_for_sale=None, plots_for_sale=None):
self.name = name
self.href = href
self.flats_for_rent = flats_for_rent
self.flats_for_sale = flats_for_sale
self.houses_for_sale = houses_for_sale
self.plots_for_sale = plots_for_sale
class Property:
def __init__(self, href, addr, desc, price):
self.href = href
self.addr = addr
self.desc = desc
self.price = price
def get_price(self):
return float(self.price.replace(" ", "").replace(",", ".").split('')[0])
def get_price_m2(self):
return float(self.price.replace(" ", "").replace(",", ".").split('')[1][1:])
def get_m2(self):
return float(self.get_price() / self.get_price_m2())
def get_roi(self, avg_rent_m2):
return float(avg_rent_m2 * self.get_m2() * 12 / self.get_price())
class PropertyCollection:
def __init__(self, properties):
self.properties = properties
self.average_price = self.calc_average_price()
self.average_price_m2 = self.calc_average_price_m2()
self.median_price = self.calc_median_price()
self.median_price_m2 = self.calc_median_price_m2()
def calc_average_price(self):
return sum(c.get_price() for c in self.properties) / len(self.properties) if len(self.properties) > 0 else 0
def calc_average_price_m2(self):
return sum(c.get_price_m2() for c in self.properties) / len(self.properties) if len(self.properties) > 0 else 0
def calc_median_price(self):
return statistics.median([c.get_price() for c in self.properties]) if len(self.properties) > 0 else 0
def calc_median_price_m2(self):
return statistics.median([c.get_price_m2() for c in self.properties]) if len(self.properties) > 0 else 0
def sort_by_roi(self, price_m2):
if len(self.properties) > 0:
self.properties = sorted(self.properties, key=lambda x: x.get_roi(price_m2), reverse=True)
def count(self):
return len(self.properties)
def get_script_path():
return os.path.dirname(os.path.realpath(sys.argv[0]))
def send_mail(send_from, send_to, subject, text, files=None, server="mail.riskahead.de"):
assert isinstance(send_to, list)
msg = MIMEMultipart()
msg['From'] = send_from
msg['To'] = ", ".join(send_to)
msg['Subject'] = subject
msg.attach(MIMEText(text))
for f in files or []:
with open(os.path.join(get_script_path(), f), "rb") as fil:
part = MIMEApplication(
fil.read(),
Name=os.path.basename(f)
)
# After the file is closed
part['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(f)
msg.attach(part)
smtp = smtplib.SMTP_SSL(server)
smtp.login('support@riskahead.de', "405risksupport")
smtp.sendmail(send_from, send_to, msg.as_string())
smtp.close()
def switch_page(driver, page):
driver.switch_to.default_content()
driver.find_element_by_id('searchFormField_obj') \
.click()
driver.find_element_by_id('options_obj') \
.find_element_by_xpath('//li/label[contains(text(),\''+page+'\')]/..') \
.click()
driver.switch_to.frame("sideListIframe")
def scrape_page(driver):
elements = driver.find_elements_by_class_name('result-item-v3')
items = []
for element in elements:
href = element.find_element_by_class_name('object-image-link').get_attribute('href')
addr = element.find_element_by_class_name('item-address-v3').text
desc = element.find_element_by_class_name('item-description-v3').text
price = element.find_element_by_class_name('item-price-main-v3').text
items.append(Property(href, addr, desc, price))
try:
pagination = driver.find_element_by_class_name('sidebar-pagination')
next_page_element = pagination.find_elements_by_xpath('//a[contains(text(), \'Next page\')]')
if len(next_page_element) == 1:
next_page_element[0].click()
items += scrape_page(driver)
except NoSuchElementException as e:
logger.debug("no pagination found")
return items
def scrape_pages(pages):
logger.info('Start ImoBot...')
start = time.time()
chrome_driver = os.path.join(get_script_path(), driver_win_filepath) if os.name == 'nt' else os.path.join(get_script_path(), driver_nt_filepath)
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(chrome_options=chrome_options, executable_path=chrome_driver)
workbook = xlsxwriter.Workbook(os.path.join(get_script_path(), filename_output))
worksheet = workbook.add_worksheet()
now = datetime.datetime.now()
worksheet.write('A2', now.strftime("%Y-%m-%d %H:%M"))
areas = []
row = 1
for page in pages:
row, area = scrape_and_write_page(driver, worksheet, page[1], page[0], row)
areas.append(area)
workbook.close()
end = time.time()
subject = "ImoBot-Weekly-Results"
text = "ImoBot-Weekly-Results\n\n" + \
"Last run: {}\n".format(now.strftime("%Y-%m-%d %H:%M"))
for area in areas:
text += "{}: (Flats: {}, Houses: {}, Plots: {})\n".format(area.name, area.flats_for_sale, area.houses_for_sale, area.plots_for_sale)
send_mail('webmaster@riskahead.de',
mailto,
subject,
text,
[filename_output])
logger.info('Finished in {:.0f} sec'.format(end-start))
def scrape_and_write_page(driver, worksheet, href, name, row):
driver.get(href)
flats_for_sale, houses_for_sale, plots_for_sale, flats_for_rent = get_prices_from_site(driver)
row = row + 2
worksheet.write('A' + str(row), name)
row, ffr = write_to_ws(flats_for_rent, "Flats for rent", flats_for_rent, worksheet, row)
row, ffs = write_to_ws(flats_for_rent, "Flats for sale", flats_for_sale, worksheet, row)
row, hfs = write_to_ws(flats_for_rent, "Houses for sale", houses_for_sale, worksheet, row)
row, pfs = write_to_ws(flats_for_rent, "Plots for sale", plots_for_sale, worksheet, row)
return row, Area(name, href, flats_for_rent=ffr, flats_for_sale=ffs, houses_for_sale=hfs, plots_for_sale=pfs)
def write_to_ws(flats_for_rent, name, properties, worksheet, row):
row += 2
worksheet.write('B' + str(row), name)
row += 1
worksheet.write('C' + str(row), 'Amount')
worksheet.write('D' + str(row), properties.count())
row += 1
worksheet.write('C' + str(row), 'AVG Price')
worksheet.write('D' + str(row), properties.average_price)
row += 1
worksheet.write('C' + str(row), 'AVG m² Price')
worksheet.write('D' + str(row), properties.average_price_m2)
row += 2
if name == 'Flats for rent':
worksheet.write('C' + str(row), 'MED Price')
worksheet.write('D' + str(row), properties.median_price)
row += 1
worksheet.write('C' + str(row), 'MED m² Price')
worksheet.write('D' + str(row), properties.median_price_m2)
else:
worksheet.write('C' + str(row), 'Address')
worksheet.write('D' + str(row), 'GRY (AVG)')
worksheet.write('E' + str(row), 'GRY (MED)')
worksheet.write('F' + str(row), 'Total Price')
worksheet.write('G' + str(row), 'Price per m²')
worksheet.write('H' + str(row), 'Description')
worksheet.write('I' + str(row), 'Link')
row += 1
for prop in properties.properties[:10]:
logger.info("ROI (Median): {:3.2%}, ROI (Average): {:3.2%}, Price: {}€, Address: {}, URL: {}".format(
prop.get_roi(flats_for_rent.median_price_m2), prop.get_roi(flats_for_rent.average_price_m2), prop.price,
prop.addr, prop.href))
worksheet.write('C' + str(row), prop.addr)
worksheet.write('D' + str(row), prop.get_roi(flats_for_rent.average_price_m2))
worksheet.write('E' + str(row), prop.get_roi(flats_for_rent.median_price_m2))
worksheet.write('F' + str(row), prop.get_price())
worksheet.write('G' + str(row), prop.get_price_m2())
worksheet.write('H' + str(row), prop.desc)
worksheet.write('I' + str(row), prop.href)
row += 1
return row, properties.count()
def get_prices_from_site(driver):
try:
driver.find_element_by_class_name('close-button').click()
except ElementNotVisibleException:
pass
driver.switch_to.frame("sideListIframe")
logger.info("Start scraping flats for sale...")
flats_for_sale = PropertyCollection(scrape_page(driver))
logger.info("Found {} flats for sale".format(flats_for_sale.count()))
logger.info("Average price: {:8.2f}".format(flats_for_sale.average_price))
logger.info("Average m2 price: {:5.2f}".format(flats_for_sale.average_price_m2))
logger.info("Median m2 price: {:5.2f}".format(flats_for_sale.median_price_m2))
logger.info("Start scraping houses for sale...")
switch_page(driver, 'Houses for sale')
time.sleep(3) # Avoid race condition by giving chromedriver enough time to load the page
houses_for_sale = PropertyCollection(scrape_page(driver))
logger.info("Found {} houses for sale".format(houses_for_sale.count()))
logger.info("Average price: {:8.2f}".format(houses_for_sale.average_price))
logger.info("Average m2 price: {:5.2f}".format(houses_for_sale.average_price_m2))
logger.info("Median m2 price: {:5.2f}".format(houses_for_sale.median_price_m2))
logger.info("Start scraping plots for sale...")
switch_page(driver, 'Plots for sale')
time.sleep(3)
plots_for_sale = PropertyCollection(scrape_page(driver))
logger.info("Found {} plots for sale".format(plots_for_sale.count()))
logger.info("Average price: {:8.2f}".format(plots_for_sale.average_price))
logger.info("Average m2 price: {:5.2f}".format(plots_for_sale.average_price_m2))
logger.info("Median m2 price: {:5.2f}".format(plots_for_sale.median_price_m2))
logger.info("Start scraping flats for rent...")
switch_page(driver, 'Flats for rent')
time.sleep(3)
flats_for_rent = PropertyCollection(scrape_page(driver))
logger.info("Found {} flats for rent".format(flats_for_rent.count()))
logger.info("Average price: {:4.2f}".format(flats_for_rent.average_price))
logger.info("Average m2 price: {:3.2f}".format(flats_for_rent.average_price_m2))
logger.info("Median m2 price: {:3.2f}".format(flats_for_rent.median_price_m2))
flats_for_sale.sort_by_roi(flats_for_rent.average_price_m2)
houses_for_sale.sort_by_roi(flats_for_rent.average_price_m2)
plots_for_sale.sort_by_roi(flats_for_rent.average_price_m2)
return flats_for_sale, houses_for_sale, plots_for_sale, flats_for_rent
if __name__ == '__main__':
assert (len(sys.argv) == 1), "Wrong number of arguments given"
os.environ["PYTHONIOENCODING"] = "utf-8"
logger = logging.getLogger('imobot_logger')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(get_script_path(), history_filepath), "a", encoding='utf-8')
fh.setLevel(logging.DEBUG)
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
scrape_pages(pages)