data/datasets/gutenberg/project_gutenberg_crawler.ipynb
Make sure you read the site's TOS and the notebook's README.md on how to use the crawler.
# uncomment and run below lines to set up if running in colab
# !git clone https://github.com/LAION-AI/Open-Assistant.git
# %cd Open-Assistant/notebooks/gutenberg
# !pip install -r requirements.txt
# global settings
LANG = (
"en" # crawl english language books, NOTE: there are a few houndred books with multiple languages such as 'en; es'
)
FOLDER = "text" # save metadata and body of text to this folder
CHUNKS = 1 # optionally divide the dataset into this many compressed parquet files if you have less memory
STATUS = "crawled.csv" # save the list of downloaded files and their status into this csv
# import required packages
import os
import io
import re
import requests
import time
import warnings
try:
from BeautifulSoup import BeautifulSoup
except ImportError:
from bs4 import BeautifulSoup
from tqdm import tqdm
import numpy as np
import pandas as pd
from typing import Tuple, Optional, Any
class GutenbergCrawler:
HEADER = {
"User-Agent": "Mozilla/5.0 (compatible; GutenbergCrawler/0.1)",
}
TIMER = 600 # wait ms between calls
MIRRORS = [
"http://www.mirrorservice.org/sites/ftp.ibiblio.org/pub/docs/books/gutenberg/",
"https://www.gutenberg.org/dirs/",
"http://mirrors.xmission.com/gutenberg/",
] # see https://www.gutenberg.org/MIRRORS.ALL for available mirrors
def __init__(self, folder: Optional[str] = None) -> None:
self.folder = folder
if self.folder is not None:
os.makedirs(self.folder, exist_ok=True)
self.calls = 0
self.last_call = 0
def _get(self, url: str) -> str:
self.calls += 1
diff = max(0.0, self.TIMER - (time.time() - self.last_call))
if diff:
time.sleep(diff / 1000.0)
data = requests.get(url, headers=self.HEADER)
self.last_call = time.time()
if data.status_code == 404:
return None
try:
return data.content.decode("utf-8")
except UnicodeDecodeError:
try:
return data.content.decode("ISO-8859-1") # latin-1
except UnicodeDecodeError:
return data.content.decode("utf-8", "backslashreplace")
def catalog(self) -> pd.DataFrame:
try:
csv = pd.read_csv("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv.gz", sep=",")
except Exception:
raw = self._get("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv")
if raw is None:
raise ValueError("Catalog CSV file does not exist!")
csv = pd.read_csv(io.StringIO(raw), sep=",")
return csv.loc[csv["Type"] == "Text"].reset_index(drop=True)
def search(self, url: str) -> dict:
"""Use catalog() instead! Returns dict with book_id: 'book title' pairs for gutenberg.org pages"""
assert "/www.gutenberg.org" in url, "The URL must be a page at https://www.gutenberg.org/"
html = self._get(url)
if html is None:
return {}
dom = BeautifulSoup(html, "html.parser")
results = {}
for a in dom.find_all("a"):
for elem in re.findall(r"<a href=\"/ebooks/(\d+)\">(.+?)</a>", str(a)):
ebook, title = elem
results[int(ebook)] = title.replace("\r
", "\r\n")
return results
def download(self, book: int) -> Optional[str]:
book = int(book)
assert book > 0
mirror = np.random.choice(self.MIRRORS)
if book < 10:
page = f"0/{book}/"
else:
page = "/".join([char for char in str(book)[:-1]]) + f"/{book}/"
url = f"{mirror}{page}{book}-h/{book}-h.htm"
return self._get(url)
def parse(self, book: int, html: str) -> Tuple[Optional[str], Optional[str]]:
book = int(book)
assert book > 0
if html is None:
return None, None
dom = BeautifulSoup(html, "html.parser")
if dom is None or dom.title is None or dom.title.string is None or "404" in dom.title.string:
return None, None
meta = ""
for pre in dom.select("title, pre"):
meta += str(pre.get_text()).strip()
# remove metadata from dom afterwards
pre.extract()
if re.findall(r"(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n", meta):
warnings.warn(f"Book {book} is copyrighted.")
return None, None
for img in dom.select("img"):
# add image alt attributes as text
try:
img.insert(0, img["alt"])
except KeyError:
pass
text = str(dom.get_text()).strip()
if re.findall(r"(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n", text):
warnings.warn(f"Book {book} is copyrighted.")
return None, None
s = re.split(r"(?i)\*{2,}[^\n]+?project gutenberg[^\n]+?\*{2,}\s*[\r\n]+", text) # 49843
if len(s) > 1:
if len(s) > 3:
warnings.warn(f"Book {book} is malformed.")
return None, None
meta += s[0]
return meta, s[1]
return meta, text
@staticmethod
def pretty(text: Optional[str]) -> str:
if not text:
return ""
# attempt to remove transcriber's notes
text = re.sub(r"(?i)(?:\[|\b)transcriber[\'’]?s? notes?\s*(?:[^\xa0\n].*?\]?(?:\r?\n){1,2})+", "", text)
# attempt to remove e-text info
text = re.sub(
r"(?i)e-text prepared(?:[^\xa0]\(?.+\)?\r?\n{1,3})+(?:\xa0*\s*note\:\s*(?:.+\s*\r{0,2}\n{1,2}){1,5}\xa0\s+)?",
"",
text,
)
# standardize line endings
text = "\r\n".join(text.splitlines())
text = re.sub(r"(\r\n){3,}", "\r\n\r\n\r\n", text).strip()
return text
def _write(self, file: str, content: str) -> None:
path = os.path.join(self.folder, file) if self.folder is not None else file
with open(path, "w+", encoding="utf-8") as f:
f.write(content)
def save(self, book: int) -> bool:
html = self.download(book)
meta, text = self.parse(book, html)
if meta:
self._write(f"{book}_meta.txt", meta)
if text:
self._write(f"{book}_text.txt", text)
return bool(text)
gc = GutenbergCrawler(FOLDER) # use text/ folder to save files
# get the catalog of ebooks (only text types will be returned)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]
assert len(df), "No matching items in catalog!"
df = df.sample(frac=1) # random shuffle
df.head()
if os.path.exists(STATUS):
crawled = pd.read_csv(STATUS)
else:
crawled = pd.DataFrame({"book": [], "success": []})
print(f"{len(crawled)} out of {len(df)} items done.")
# NOTE: this will take really long depending on the number of ebooks selected
for index, row in df.iterrows():
book = row["Text#"]
if book not in crawled["book"].values:
t = time.time()
print(f"#{book} {row['Title']} ({row['Language']})", end=" ")
if gc.save(book):
print("✔️", end=" ")
crawled = crawled.append({"book": book, "success": True}, ignore_index=True)
else:
print("❌", end=" ")
crawled = crawled.append({"book": book, "success": False}, ignore_index=True)
print(f"- {(time.time() - t):.3f}s")
crawled.to_csv(STATUS, index=False)
if len(crawled) % 25 == 0:
print(f"▶▶▶ {len(crawled)} done ({int(crawled['success'].sum()) } successful) out of {len(df)} ◀◀◀")
print("Done.")
crawled = pd.read_csv(STATUS)
crawled = crawled.loc[crawled["success"] == True]
crawled.rename(columns={"book": "Text#"}, inplace=True)
gc = GutenbergCrawler(FOLDER)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]
print(f"{len(crawled)} out of {len(df)} ({len(crawled) / len(df) * 100.:.2f}%) available.")
df.drop_duplicates(subset=["Text#"], inplace=True)
df = pd.merge(df, crawled, on=["Text#"], how="inner")
assert not len(df.loc[df["success"] == False])
del crawled
df.drop(columns=["Type", "Language", "success"], inplace=True)
df.sort_values(by="Text#", ascending=True, inplace=True)
len(df) # number of items after merging with metadata
def read(file: str) -> Optional[str]:
result = None
if os.path.exists(file):
with open(file, "r", encoding="utf-8") as f:
result = f.read()
return result
def strip(value: Any) -> str:
return str(value).strip() if value and pd.notna(value) else ""
for chunk in range(CHUNKS):
n = len(df) // CHUNKS
start, end = chunk * n, (chunk + 1) * n if chunk < CHUNKS - 1 else len(df)
updated = {col: [] for col in list(df.columns) + ["Body"]}
books = df["Text#"].values[start:end]
for book in tqdm(books):
text = read(os.path.join(FOLDER, f"{book}_text.txt"))
text = gc.pretty(text)
if not text:
continue
df_row = df.loc[df["Text#"] == book]
updated["Text#"].append(book)
updated["Issued"].append(pd.to_datetime(df_row["Issued"].values[0], format="%Y-%m-%d", errors="coerce"))
updated["Title"].append(strip(df_row["Title"].values[0]))
updated["Authors"].append(strip(df_row["Authors"].values[0]))
updated["Subjects"].append(strip(df_row["Subjects"].values[0]))
updated["LoCC"].append(strip(df_row["LoCC"].values[0]))
updated["Bookshelves"].append(strip(df_row["Bookshelves"].values[0]))
updated["Body"].append(text)
updated = pd.DataFrame(updated)
if CHUNKS == 1:
updated.to_parquet(f"gutenberg_{LANG}_all.pq", index=False, engine="pyarrow", compression="gzip")
else:
updated.to_parquet(
f"gutenberg_{LANG}_{chunk + 1}_of_{CHUNKS}.pq", index=False, engine="pyarrow", compression="gzip"
)
del updated
print("Done.")