usse/scrape/funda-scraper/funda_scraper/scrape.py

359 lines
13 KiB
Python

"""Main funda scraper module"""
import argparse
import datetime
import json
import multiprocessing as mp
import os
from typing import List, Literal, Optional
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
from tqdm.contrib.concurrent import process_map
from funda_scraper.config.core import config
from funda_scraper.preprocess import clean_list_date, preprocess_data
from funda_scraper.utils import logger
class FundaScraper(object):
"""
Handles the main scraping function.
"""
def __init__(
self,
want_to: str = "koop",
area: str = "",
page_start: int = 1,
n_pages: int = 1,
find_past: bool = False,
min_price: Optional[int] = None,
max_price: Optional[int] = None,
url : str = "",
):
# Init attributes
self.area = area.lower().replace(" ", "-")
self.want_to = want_to
self.find_past = find_past
self.page_start = max(page_start, 1)
self.n_pages = max(n_pages, 1)
self.page_end = self.page_start + self.n_pages - 1
self.min_price = min_price
self.max_price = max_price
# Instantiate along the way
self.links: List[str] = []
self.raw_df = pd.DataFrame()
self.clean_df = pd.DataFrame()
self.base_url = config.base_url
self.url = url
self.selectors = config.css_selector
def __repr__(self):
return (
f"FundaScraper(area={self.area}, "
f"want_to={self.want_to}, "
f"n_pages={self.n_pages}, "
f"page_start={self.page_start}, "
f"find_past={self.find_past})"
f"min_price={self.min_price})"
f"max_price={self.max_price})"
)
@property
def to_buy(self) -> bool:
"""Whether to buy or not"""
if self.want_to.lower() in ["buy", "koop", "b", "k"]:
return True
elif self.want_to.lower() in ["rent", "huur", "r", "h"]:
return False
else:
raise ValueError("'want_to' must be either 'buy' or 'rent'.")
@staticmethod
def _check_dir() -> None:
"""Check whether a temporary directory for data"""
if not os.path.exists("data"):
os.makedirs("data")
@staticmethod
def _get_links_from_one_parent(url: str) -> List[str]:
"""Scrape all the available housing items from one Funda search page."""
response = requests.get(url, headers=config.header)
soup = BeautifulSoup(response.text, "lxml")
script_tag = soup.find_all("script", {"type": "application/ld+json"})[0]
json_data = json.loads(script_tag.contents[0])
urls = [item["url"] for item in json_data["itemListElement"]]
return list(set(urls))
def reset(
self,
area: Optional[str] = None,
want_to: Optional[str] = None,
page_start: Optional[int] = None,
n_pages: Optional[int] = None,
find_past: Optional[bool] = None,
min_price: Optional[int] = None,
max_price: Optional[int] = None,
) -> None:
"""Overwrite or initialise the searching scope."""
if area is not None:
self.area = area
if want_to is not None:
self.want_to = want_to
if page_start is not None:
self.page_start = max(page_start, 1)
if n_pages is not None:
self.n_pages = max(n_pages, 1)
if find_past is not None:
self.find_past = find_past
if min_price is not None:
self.min_price = min_price
if max_price is not None:
self.max_price = max_price
def fetch_all_links(self, page_start: int = None, n_pages: int = None) -> None:
"""Find all the available links across multiple pages."""
page_start = self.page_start if page_start is None else page_start
n_pages = self.n_pages if n_pages is None else n_pages
logger.info("*** Phase 1: Fetch all the available links from all pages *** ")
urls = []
main_url = self._build_main_query_url()
for i in tqdm(range(page_start, page_start + n_pages)):
try:
item_list = self._get_links_from_one_parent(
f"{main_url}&search_result={i}"
)
urls += item_list
except IndexError:
self.page_end = i
logger.info(f"*** The last available page is {self.page_end} ***")
break
urls = list(set(urls))
logger.info(
f"*** Got all the urls. {len(urls)} houses found from {self.page_start} to {self.page_end} ***"
)
self.links = urls
def _build_main_query_url(self) -> str:
if self.url: # Override URL if provided.
main_url = self.url
else:
query = "koop" if self.to_buy else "huur"
main_url = f"{self.base_url}/zoeken/{query}?selected_area=%22{self.area}%22"
if self.find_past:
main_url = f"{main_url}&availability=%22unavailable%22"
if self.min_price is not None or self.max_price is not None:
min_price = "" if self.min_price is None else self.min_price
max_price = "" if self.max_price is None else self.max_price
main_url = f"{main_url}&price=%22{min_price}-{max_price}%22"
return main_url
@staticmethod
def get_value_from_css(soup: BeautifulSoup, selector: str) -> str:
"""Use CSS selector to find certain features."""
result = soup.select(selector)
if len(result) > 0:
result = result[0].text
else:
result = "na"
return result
def scrape_one_link(self, link: str) -> List[str]:
"""Scrape all the features from one house item given a link."""
# Initialize for each page
response = requests.get(link, headers=config.header)
soup = BeautifulSoup(response.text, "lxml")
# Get the value according to respective CSS selectors
if self.to_buy:
if self.find_past:
list_since_selector = self.selectors.date_list
else:
list_since_selector = self.selectors.listed_since
else:
if self.find_past:
list_since_selector = ".fd-align-items-center:nth-child(9) span"
else:
list_since_selector = ".fd-align-items-center:nth-child(7) span"
result = [
link,
self.get_value_from_css(soup, self.selectors.price),
self.get_value_from_css(soup, self.selectors.address),
self.get_value_from_css(soup, self.selectors.descrip),
self.get_value_from_css(soup, list_since_selector),
self.get_value_from_css(soup, self.selectors.zip_code),
self.get_value_from_css(soup, self.selectors.size),
self.get_value_from_css(soup, self.selectors.year),
self.get_value_from_css(soup, self.selectors.living_area),
self.get_value_from_css(soup, self.selectors.perceel_area),
self.get_value_from_css(soup, self.selectors.kind_of_house),
self.get_value_from_css(soup, self.selectors.building_type),
self.get_value_from_css(soup, self.selectors.num_of_rooms),
self.get_value_from_css(soup, self.selectors.num_of_bathrooms),
self.get_value_from_css(soup, self.selectors.layout),
self.get_value_from_css(soup, self.selectors.energy_label),
self.get_value_from_css(soup, self.selectors.insulation),
self.get_value_from_css(soup, self.selectors.heating),
self.get_value_from_css(soup, self.selectors.ownership),
self.get_value_from_css(soup, self.selectors.exteriors),
self.get_value_from_css(soup, self.selectors.parking),
self.get_value_from_css(soup, self.selectors.neighborhood_name),
self.get_value_from_css(soup, self.selectors.date_list),
self.get_value_from_css(soup, self.selectors.date_sold),
self.get_value_from_css(soup, self.selectors.term),
self.get_value_from_css(soup, self.selectors.price_sold),
self.get_value_from_css(soup, self.selectors.last_ask_price),
self.get_value_from_css(soup, self.selectors.last_ask_price_m2).split("\r")[
0
],
]
# Deal with list_since_selector especially, since its CSS varies sometimes
# if clean_list_date(result[4]) == "na":
# for i in range(6, 16):
# selector = f".fd-align-items-center:nth-child({i}) span"
# update_list_since = self.get_value_from_css(soup, selector)
# if clean_list_date(update_list_since) == "na":
# pass
# else:
# result[4] = update_list_since
photos_list = [p.get("data-lazy-srcset") for p in soup.select(self.selectors.photo)]
photos_string = ", ".join(photos_list)
# Clean up the retried result from one page
result = [r.replace("\n", "").replace("\r", "").strip() for r in result]
result.append(photos_string)
return result
def scrape_pages(self) -> None:
"""Scrape all the content acoss multiple pages."""
logger.info("*** Phase 2: Start scraping from individual links ***")
df = pd.DataFrame({key: [] for key in self.selectors.keys()})
# Scrape pages with multiprocessing to improve efficiency
# TODO: use asynctio instead
pools = mp.cpu_count()
content = process_map(self.scrape_one_link, self.links, max_workers=pools)
for i, c in enumerate(content):
df.loc[len(df)] = c
df["city"] = df["url"].map(lambda x: x.split("/")[4])
df["log_id"] = datetime.datetime.now().strftime("%Y%m-%d%H-%M%S")
if not self.find_past:
df = df.drop(["term", "price_sold", "date_sold"], axis=1)
logger.info(f"*** All scraping done: {df.shape[0]} results ***")
self.raw_df = df
def save_csv(self, df: pd.DataFrame, filepath: str = None) -> None:
"""Save the result to a .csv file."""
if filepath is None:
self._check_dir()
date = str(datetime.datetime.now().date()).replace("-", "")
status = "unavailable" if self.find_past else "unavailable"
want_to = "buy" if self.to_buy else "rent"
filepath = f"./data/houseprice_{date}_{self.area}_{want_to}_{status}_{len(self.links)}.csv"
df.to_csv(filepath, index=False)
logger.info(f"*** File saved: {filepath}. ***")
def run(
self, raw_data: bool = False, save: bool = False, filepath: str = None
) -> pd.DataFrame:
"""
Scrape all links and all content.
:param raw_data: if true, the data won't be pre-processed
:param save: if true, the data will be saved as a csv file
:param filepath: the name for the file
:return: the (pre-processed) dataframe from scraping
"""
self.fetch_all_links()
self.scrape_pages()
if raw_data:
df = self.raw_df
else:
logger.info("*** Cleaning data ***")
df = preprocess_data(df=self.raw_df, is_past=self.find_past)
self.clean_df = df
if save:
self.save_csv(df, filepath)
logger.info("*** Done! ***")
return df
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--area",
type=str,
help="Specify which area you are looking for",
default="amsterdam",
)
parser.add_argument(
"--want_to",
type=str,
help="Specify you want to 'rent' or 'buy'",
default="rent",
)
parser.add_argument(
"--find_past",
type=bool,
help="Indicate whether you want to use hisotrical data or not",
default=False,
)
parser.add_argument(
"--page_start", type=int, help="Specify which page to start scraping", default=1
)
parser.add_argument(
"--n_pages", type=int, help="Specify how many pages to scrape", default=1
)
parser.add_argument(
"--min_price", type=int, help="Specify the min price", default=None
)
parser.add_argument(
"--max_price", type=int, help="Specify the max price", default=None
)
parser.add_argument(
"--raw_data",
type=bool,
help="Indicate whether you want the raw scraping result or preprocessed one",
default=False,
)
parser.add_argument(
"--save",
type=bool,
help="Indicate whether you want to save the data or not",
default=True,
)
args = parser.parse_args()
scraper = FundaScraper(
area=args.area,
want_to=args.want_to,
find_past=args.find_past,
page_start=args.page_start,
n_pages=args.n_pages,
min_price=args.min_price,
max_price=args.max_price,
)
df = scraper.run(raw_data=args.raw_data, save=args.save)
print(df.head())