Back to Open Assistant

Project Gutenber Crawler

data/datasets/gutenberg/project_gutenberg_crawler.ipynb

0.0.110.3 KB
Original Source

Project Gutenber Crawler

Make sure you read the site's TOS and the notebook's README.md on how to use the crawler.

python
# uncomment and run below lines to set up if running in colab
# !git clone https://github.com/LAION-AI/Open-Assistant.git
# %cd Open-Assistant/notebooks/gutenberg
# !pip install -r requirements.txt
python
# global settings

LANG = (
    "en"  # crawl english language books, NOTE: there are a few houndred books with multiple languages such as 'en; es'
)
FOLDER = "text"  # save metadata and body of text to this folder
CHUNKS = 1  # optionally divide the dataset into this many compressed parquet files if you have less memory
STATUS = "crawled.csv"  # save the list of downloaded files and their status into this csv
python
# import required packages
import os
import io
import re
import requests
import time
import warnings

try:
    from BeautifulSoup import BeautifulSoup
except ImportError:
    from bs4 import BeautifulSoup
from tqdm import tqdm

import numpy as np
import pandas as pd

from typing import Tuple, Optional, Any

Code for crawler

python
class GutenbergCrawler:
    HEADER = {
        "User-Agent": "Mozilla/5.0 (compatible; GutenbergCrawler/0.1)",
    }
    TIMER = 600  # wait ms between calls
    MIRRORS = [
        "http://www.mirrorservice.org/sites/ftp.ibiblio.org/pub/docs/books/gutenberg/",
        "https://www.gutenberg.org/dirs/",
        "http://mirrors.xmission.com/gutenberg/",
    ]  # see https://www.gutenberg.org/MIRRORS.ALL for available mirrors

    def __init__(self, folder: Optional[str] = None) -> None:
        self.folder = folder
        if self.folder is not None:
            os.makedirs(self.folder, exist_ok=True)
        self.calls = 0
        self.last_call = 0

    def _get(self, url: str) -> str:
        self.calls += 1
        diff = max(0.0, self.TIMER - (time.time() - self.last_call))
        if diff:
            time.sleep(diff / 1000.0)
        data = requests.get(url, headers=self.HEADER)
        self.last_call = time.time()
        if data.status_code == 404:
            return None
        try:
            return data.content.decode("utf-8")
        except UnicodeDecodeError:
            try:
                return data.content.decode("ISO-8859-1")  # latin-1
            except UnicodeDecodeError:
                return data.content.decode("utf-8", "backslashreplace")

    def catalog(self) -> pd.DataFrame:
        try:
            csv = pd.read_csv("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv.gz", sep=",")
        except Exception:
            raw = self._get("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv")
            if raw is None:
                raise ValueError("Catalog CSV file does not exist!")
            csv = pd.read_csv(io.StringIO(raw), sep=",")
        return csv.loc[csv["Type"] == "Text"].reset_index(drop=True)

    def search(self, url: str) -> dict:
        """Use catalog() instead! Returns dict with book_id: 'book title' pairs for gutenberg.org pages"""
        assert "/www.gutenberg.org" in url, "The URL must be a page at https://www.gutenberg.org/"
        html = self._get(url)
        if html is None:
            return {}
        dom = BeautifulSoup(html, "html.parser")
        results = {}
        for a in dom.find_all("a"):
            for elem in re.findall(r"<a href=\"/ebooks/(\d+)\">(.+?)</a>", str(a)):
                ebook, title = elem
                results[int(ebook)] = title.replace("\r
", "\r\n")
        return results

    def download(self, book: int) -> Optional[str]:
        book = int(book)
        assert book > 0
        mirror = np.random.choice(self.MIRRORS)
        if book < 10:
            page = f"0/{book}/"
        else:
            page = "/".join([char for char in str(book)[:-1]]) + f"/{book}/"
        url = f"{mirror}{page}{book}-h/{book}-h.htm"
        return self._get(url)

    def parse(self, book: int, html: str) -> Tuple[Optional[str], Optional[str]]:
        book = int(book)
        assert book > 0
        if html is None:
            return None, None
        dom = BeautifulSoup(html, "html.parser")
        if dom is None or dom.title is None or dom.title.string is None or "404" in dom.title.string:
            return None, None

        meta = ""
        for pre in dom.select("title, pre"):
            meta += str(pre.get_text()).strip()
            # remove metadata from dom afterwards
            pre.extract()
        if re.findall(r"(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n", meta):
            warnings.warn(f"Book {book} is copyrighted.")
            return None, None
        for img in dom.select("img"):
            # add image alt attributes as text
            try:
                img.insert(0, img["alt"])
            except KeyError:
                pass
        text = str(dom.get_text()).strip()
        if re.findall(r"(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n", text):
            warnings.warn(f"Book {book} is copyrighted.")
            return None, None

        s = re.split(r"(?i)\*{2,}[^\n]+?project gutenberg[^\n]+?\*{2,}\s*[\r\n]+", text)  # 49843
        if len(s) > 1:
            if len(s) > 3:
                warnings.warn(f"Book {book} is malformed.")
                return None, None
            meta += s[0]
            return meta, s[1]
        return meta, text

    @staticmethod
    def pretty(text: Optional[str]) -> str:
        if not text:
            return ""
        # attempt to remove transcriber's notes
        text = re.sub(r"(?i)(?:\[|\b)transcriber[\'’]?s? notes?\s*(?:[^\xa0\n].*?\]?(?:\r?\n){1,2})+", "", text)
        # attempt to remove e-text info
        text = re.sub(
            r"(?i)e-text prepared(?:[^\xa0]\(?.+\)?\r?\n{1,3})+(?:\xa0*\s*note\:\s*(?:.+\s*\r{0,2}\n{1,2}){1,5}\xa0\s+)?",
            "",
            text,
        )
        # standardize line endings
        text = "\r\n".join(text.splitlines())
        text = re.sub(r"(\r\n){3,}", "\r\n\r\n\r\n", text).strip()
        return text

    def _write(self, file: str, content: str) -> None:
        path = os.path.join(self.folder, file) if self.folder is not None else file
        with open(path, "w+", encoding="utf-8") as f:
            f.write(content)

    def save(self, book: int) -> bool:
        html = self.download(book)
        meta, text = self.parse(book, html)
        if meta:
            self._write(f"{book}_meta.txt", meta)
        if text:
            self._write(f"{book}_text.txt", text)
        return bool(text)

Start crawling

python
gc = GutenbergCrawler(FOLDER)  # use text/ folder to save files
python
# get the catalog of ebooks (only text types will be returned)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]
assert len(df), "No matching items in catalog!"
df = df.sample(frac=1)  # random shuffle
df.head()
python
if os.path.exists(STATUS):
    crawled = pd.read_csv(STATUS)
else:
    crawled = pd.DataFrame({"book": [], "success": []})
print(f"{len(crawled)} out of {len(df)} items done.")
python
# NOTE: this will take really long depending on the number of ebooks selected
for index, row in df.iterrows():
    book = row["Text#"]
    if book not in crawled["book"].values:
        t = time.time()
        print(f"#{book} {row['Title']} ({row['Language']})", end=" ")
        if gc.save(book):
            print("✔️", end=" ")
            crawled = crawled.append({"book": book, "success": True}, ignore_index=True)
        else:
            print("❌", end=" ")
            crawled = crawled.append({"book": book, "success": False}, ignore_index=True)
        print(f"- {(time.time() - t):.3f}s")
        crawled.to_csv(STATUS, index=False)
        if len(crawled) % 25 == 0:
            print(f"▶▶▶ {len(crawled)} done ({int(crawled['success'].sum()) } successful) out of {len(df)} ◀◀◀")

print("Done.")

Add the crawled text files into parquet datasets

python
crawled = pd.read_csv(STATUS)
crawled = crawled.loc[crawled["success"] == True]
crawled.rename(columns={"book": "Text#"}, inplace=True)

gc = GutenbergCrawler(FOLDER)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]

print(f"{len(crawled)} out of {len(df)} ({len(crawled) / len(df) * 100.:.2f}%) available.")
python
df.drop_duplicates(subset=["Text#"], inplace=True)
df = pd.merge(df, crawled, on=["Text#"], how="inner")
assert not len(df.loc[df["success"] == False])
del crawled
df.drop(columns=["Type", "Language", "success"], inplace=True)
df.sort_values(by="Text#", ascending=True, inplace=True)
len(df)  # number of items after merging with metadata
python
def read(file: str) -> Optional[str]:
    result = None
    if os.path.exists(file):
        with open(file, "r", encoding="utf-8") as f:
            result = f.read()
    return result


def strip(value: Any) -> str:
    return str(value).strip() if value and pd.notna(value) else ""


for chunk in range(CHUNKS):
    n = len(df) // CHUNKS
    start, end = chunk * n, (chunk + 1) * n if chunk < CHUNKS - 1 else len(df)

    updated = {col: [] for col in list(df.columns) + ["Body"]}
    books = df["Text#"].values[start:end]
    for book in tqdm(books):
        text = read(os.path.join(FOLDER, f"{book}_text.txt"))
        text = gc.pretty(text)
        if not text:
            continue

        df_row = df.loc[df["Text#"] == book]
        updated["Text#"].append(book)
        updated["Issued"].append(pd.to_datetime(df_row["Issued"].values[0], format="%Y-%m-%d", errors="coerce"))
        updated["Title"].append(strip(df_row["Title"].values[0]))
        updated["Authors"].append(strip(df_row["Authors"].values[0]))
        updated["Subjects"].append(strip(df_row["Subjects"].values[0]))
        updated["LoCC"].append(strip(df_row["LoCC"].values[0]))
        updated["Bookshelves"].append(strip(df_row["Bookshelves"].values[0]))
        updated["Body"].append(text)

    updated = pd.DataFrame(updated)
    if CHUNKS == 1:
        updated.to_parquet(f"gutenberg_{LANG}_all.pq", index=False, engine="pyarrow", compression="gzip")
    else:
        updated.to_parquet(
            f"gutenberg_{LANG}_{chunk + 1}_of_{CHUNKS}.pq", index=False, engine="pyarrow", compression="gzip"
        )
    del updated

print("Done.")