Back to Open Assistant

Crawler for The Internet Movie Script Database (IMSDb)

data/datasets/tv_dialogue/imsdb.ipynb

0.0.19.4 KB
Original Source

Crawler for The Internet Movie Script Database (IMSDb)

Downloads and parses data from https://imsdb.com/ and optionally from other transcript sources. Raw transcripts where dialogue is denoted by the positioning of blocks of text is processed and turned into a dialogue text format where speakers are denoted as [person] with bracets.

Note that transcripts with invalid character encoding will be discarded as they would add noise to the tokens.

python
# uncomment and run below lines to set up if running in colab
# !git clone https://github.com/LAION-AI/Open-Assistant.git
# %cd Open-Assistant/data/datasets/tv_dialogue
# !pip install -r requirements.txt
python
# global settings

FOLDER = "srt"  # save raw transcripts here
STATUS = "crawled.csv"  # save the list of downloaded files and their status into this csv
python
# import required packages
import os
import io
import re
import requests
import time
import warnings

try:
    from BeautifulSoup import BeautifulSoup
except ImportError:
    from bs4 import BeautifulSoup
from tqdm import tqdm

import numpy as np
import pandas as pd

from typing import Tuple, Optional, Any
python
class IMSDbCrawler:
    HEADER = {
        "User-Agent": "Mozilla/5.0 (compatible; TranscriptCrawler/0.2)",
    }
    TIMER = 300  # wait ms between calls
    WEBSITE = "https://imsdb.com/"

    def __init__(self, folder: Optional[str] = None) -> None:
        self.folder = folder
        if self.folder is not None:
            os.makedirs(self.folder, exist_ok=True)
        self.last_call = 0

    def _get(self, url: str, allow_unicode_errors: bool = True) -> Optional[str]:
        diff = max(0.0, self.TIMER - (time.time() - self.last_call))
        if diff:
            time.sleep(diff / 1000.0)
        data = requests.get(url, headers=self.HEADER)
        self.last_call = time.time()
        if data.status_code == 404:
            return None

        try:
            return data.content.decode("utf-8")
        except UnicodeDecodeError:
            try:
                return data.content.decode("ISO-8859-1")  # latin-1
            except UnicodeDecodeError:
                if allow_unicode_errors:
                    return data.content.decode("utf-8", "backslashreplace")
                else:
                    return None

    def get_catalog(self) -> pd.DataFrame:
        # create catalog of all movies from the contents of the site
        movies = {"alpha": [], "title": [], "link": []}
        alpha = [f"alphabetical/{alpha}" for alpha in ["0"] + [chr(i) for i in range(ord("A"), ord("Z") + 1)]]
        tv = ["TV/South%20Park.html", "TV/Stargate%20SG1.html"]  # "TV/Futurama.html", "TV/Seinfeld.html", "TV/Lost"
        for slug in tqdm(alpha + tv):
            html = self._get(f"https://imsdb.com/{slug}")
            if html is None:
                continue
            dom = BeautifulSoup(html, "html.parser")
            for movie in dom.select("table")[1].select("tr")[0].select("td")[-1].select("a"):
                movies["alpha"].append(slug.split("/")[1].split(".html")[0])
                movies["title"].append(movie.string)
                movies["link"].append(movie["href"])
        movies = pd.DataFrame(movies)
        movies["status"] = np.nan
        return movies

    def download(self, url: str) -> Optional[str]:
        # get the script url from the movie's page
        html = self._get(f"{self.WEBSITE}{url}")
        if html is None:
            return None
        dom = BeautifulSoup(html, "html.parser")
        for a in dom.find("table", {"class": "script-details"}).select("a"):
            if "scripts/" in a["href"] and ".html" in a["href"]:
                script = a["href"] if "http" in a["href"] else f"{self.WEBSITE}{a['href'].lstrip('/')}"
                return self._get(script, allow_unicode_errors=False)
        return None

    def _clean_dom(self, html: str):
        dom = BeautifulSoup(html, "html.parser")
        if dom.find("div", {"id": "content"}):
            dom = dom.find("div", {"id": "content"})
        if dom.find("td", {"class": "scrtext"}):
            dom = dom.find("td", {"class": "scrtext"})
        if dom.find("pre"):
            dom = dom.find("pre")
        for s in dom.select("script"):
            s.extract()
        for a in dom.select("a"):
            a.extract()
        return dom

    def is_person(self, speaker: str) -> bool:
        if len(speaker) <= 1:
            return False
        elif "!" in speaker or "?" in speaker or "..." in speaker:
            return False
        elif speaker[0] == "-" or "--" in speaker:
            return False
        elif speaker[0].isnumeric():
            return False
        elif speaker[0].isalpha() and speaker[0].islower():
            return False
        elif speaker[0] == "(" and speaker[-1] == ")":
            return False
        elif speaker[0] == '"' and speaker[-1] == '"':
            return False
        elif speaker.count('"') % 2 != 0 or speaker.count("(") != speaker.count(")"):
            return False
        elif re.findall(r"\b(FADES?|CUTS? TO|MUSIC)\b", speaker):
            return False
        return True

    def parse(self, html) -> str:
        dom = self._clean_dom(html)

        minlines = np.inf
        for line in dom.text.splitlines():
            match = re.findall(r"(\s*)(\S.*)(?:\r?\n)*", line)
            if match:
                minlines = min(minlines, len(match[0][0]))

        text = ""
        speaker = ""
        last = minlines
        for line in dom.text.splitlines():
            match = re.findall(r"(\s*)(\S.*)(?:\r?\n)*", line)
            if match:
                n = len(match[0][0])
                script = match[0][1].strip()
                if script[0] == "[" and script[-1] == "]":
                    script = f"({script[1:-1]})"
                if n == minlines:
                    if speaker:
                        text += f"{speaker}\r\n{script}\r\n"
                        speaker = ""
                    else:
                        text += f"{script}\r\n"
                    last = n
                else:
                    if n >= last:
                        if speaker:
                            text += f"{speaker}\r\n"
                            speaker = ""
                        if n > last:
                            speaker = script
                            last = n
                        else:
                            text += f"{script}\r\n"
                    else:
                        if speaker:
                            if not self.is_person(speaker):
                                text += f"{speaker}\r\n{script}\r\n"
                            else:
                                if speaker[-1] == ":":
                                    speaker = speaker[:-1]
                                text += f"[{speaker}] {script}\r\n"
                            speaker = ""
                        else:
                            text += f"{script}\r\n"
                    try:
                        script = float(script.strip())
                        last = minlines
                    except ValueError:
                        last = n
            else:
                text += "\r\n"
        if speaker:
            text += f"{speaker}\r\n"
        if not re.findall(r"\[.+?\] .+?\r\n\r\n\[.+?\] .+?\r\n\r\n", text):
            return ""
        first_occurrence = re.findall(r"\[.+?\] ", text)[0]
        if len(re.findall(re.escape(first_occurrence), text)) == 1:
            text = re.sub(re.escape(first_occurrence), f"{first_occurrence[1:-2]}\r\n", text)

        text = text.replace("&amp;", "&")
        text = "\r\n".join(text.splitlines())
        text = re.sub(r"(\r*\n)", "\n", text)
        text = re.sub(r"\n{2,}", "\n\n", text).strip()

        return text

    def _write(self, file: str, content: str) -> None:
        path = os.path.join(self.folder, file) if self.folder is not None else file
        with open(path, "w+", newline=None, encoding="utf-8") as f:
            f.write(content)

    def save(self, url: str) -> bool:
        html = self.download(url)
        if not html:
            return False
        script = self.parse(html)
        if len(script) < 128:
            return False

        name = ".".join(url.split(".")[:-1]) if "." in url[-5:] else url
        name = "".join([c for c in name.replace(" ", "_").replace("/", "-") if c.isalnum() or c in ("-", "_")])
        self._write(f"{name}.txt", script)
        return True
python
ic = IMSDbCrawler(FOLDER)
python
catalog = ic.get_catalog()
catalog.to_csv(STATUS, index=False)
catalog
python
# NOTE: this will take really long
catalog = pd.read_csv(STATUS)
crawled = catalog.copy()
for index, row in catalog.iterrows():
    if pd.isna(row["status"]):
        t = time.time()
        print(f"{row['alpha']} {row['title']}", end=" ")
        if ic.save(row["link"]):
            print("✔️", end=" ")
            crawled.at[index, "status"] = 1.0
        else:
            print("❌", end=" ")
            crawled.at[index, "status"] = 0.0
        print(f"- {(time.time() - t):.3f}s")
        crawled.to_csv(STATUS, index=False)
        if pd.notna(crawled["status"]).sum() % 25 == 0:
            print(
                f"▶▶▶ {pd.notna(crawled['status']).sum()} done ({int(crawled['status'].sum())} successful) out of {len(crawled)} ◀◀◀"
            )
print("Done.")