data/datasets/tv_dialogue/imsdb.ipynb
Downloads and parses data from https://imsdb.com/ and optionally from other transcript sources. Raw transcripts where dialogue is denoted by the positioning of blocks of text is processed and turned into a dialogue text format where speakers are denoted as [person] with bracets.
Note that transcripts with invalid character encoding will be discarded as they would add noise to the tokens.
# uncomment and run below lines to set up if running in colab
# !git clone https://github.com/LAION-AI/Open-Assistant.git
# %cd Open-Assistant/data/datasets/tv_dialogue
# !pip install -r requirements.txt
# global settings
FOLDER = "srt" # save raw transcripts here
STATUS = "crawled.csv" # save the list of downloaded files and their status into this csv
# import required packages
import os
import io
import re
import requests
import time
import warnings
try:
from BeautifulSoup import BeautifulSoup
except ImportError:
from bs4 import BeautifulSoup
from tqdm import tqdm
import numpy as np
import pandas as pd
from typing import Tuple, Optional, Any
class IMSDbCrawler:
HEADER = {
"User-Agent": "Mozilla/5.0 (compatible; TranscriptCrawler/0.2)",
}
TIMER = 300 # wait ms between calls
WEBSITE = "https://imsdb.com/"
def __init__(self, folder: Optional[str] = None) -> None:
self.folder = folder
if self.folder is not None:
os.makedirs(self.folder, exist_ok=True)
self.last_call = 0
def _get(self, url: str, allow_unicode_errors: bool = True) -> Optional[str]:
diff = max(0.0, self.TIMER - (time.time() - self.last_call))
if diff:
time.sleep(diff / 1000.0)
data = requests.get(url, headers=self.HEADER)
self.last_call = time.time()
if data.status_code == 404:
return None
try:
return data.content.decode("utf-8")
except UnicodeDecodeError:
try:
return data.content.decode("ISO-8859-1") # latin-1
except UnicodeDecodeError:
if allow_unicode_errors:
return data.content.decode("utf-8", "backslashreplace")
else:
return None
def get_catalog(self) -> pd.DataFrame:
# create catalog of all movies from the contents of the site
movies = {"alpha": [], "title": [], "link": []}
alpha = [f"alphabetical/{alpha}" for alpha in ["0"] + [chr(i) for i in range(ord("A"), ord("Z") + 1)]]
tv = ["TV/South%20Park.html", "TV/Stargate%20SG1.html"] # "TV/Futurama.html", "TV/Seinfeld.html", "TV/Lost"
for slug in tqdm(alpha + tv):
html = self._get(f"https://imsdb.com/{slug}")
if html is None:
continue
dom = BeautifulSoup(html, "html.parser")
for movie in dom.select("table")[1].select("tr")[0].select("td")[-1].select("a"):
movies["alpha"].append(slug.split("/")[1].split(".html")[0])
movies["title"].append(movie.string)
movies["link"].append(movie["href"])
movies = pd.DataFrame(movies)
movies["status"] = np.nan
return movies
def download(self, url: str) -> Optional[str]:
# get the script url from the movie's page
html = self._get(f"{self.WEBSITE}{url}")
if html is None:
return None
dom = BeautifulSoup(html, "html.parser")
for a in dom.find("table", {"class": "script-details"}).select("a"):
if "scripts/" in a["href"] and ".html" in a["href"]:
script = a["href"] if "http" in a["href"] else f"{self.WEBSITE}{a['href'].lstrip('/')}"
return self._get(script, allow_unicode_errors=False)
return None
def _clean_dom(self, html: str):
dom = BeautifulSoup(html, "html.parser")
if dom.find("div", {"id": "content"}):
dom = dom.find("div", {"id": "content"})
if dom.find("td", {"class": "scrtext"}):
dom = dom.find("td", {"class": "scrtext"})
if dom.find("pre"):
dom = dom.find("pre")
for s in dom.select("script"):
s.extract()
for a in dom.select("a"):
a.extract()
return dom
def is_person(self, speaker: str) -> bool:
if len(speaker) <= 1:
return False
elif "!" in speaker or "?" in speaker or "..." in speaker:
return False
elif speaker[0] == "-" or "--" in speaker:
return False
elif speaker[0].isnumeric():
return False
elif speaker[0].isalpha() and speaker[0].islower():
return False
elif speaker[0] == "(" and speaker[-1] == ")":
return False
elif speaker[0] == '"' and speaker[-1] == '"':
return False
elif speaker.count('"') % 2 != 0 or speaker.count("(") != speaker.count(")"):
return False
elif re.findall(r"\b(FADES?|CUTS? TO|MUSIC)\b", speaker):
return False
return True
def parse(self, html) -> str:
dom = self._clean_dom(html)
minlines = np.inf
for line in dom.text.splitlines():
match = re.findall(r"(\s*)(\S.*)(?:\r?\n)*", line)
if match:
minlines = min(minlines, len(match[0][0]))
text = ""
speaker = ""
last = minlines
for line in dom.text.splitlines():
match = re.findall(r"(\s*)(\S.*)(?:\r?\n)*", line)
if match:
n = len(match[0][0])
script = match[0][1].strip()
if script[0] == "[" and script[-1] == "]":
script = f"({script[1:-1]})"
if n == minlines:
if speaker:
text += f"{speaker}\r\n{script}\r\n"
speaker = ""
else:
text += f"{script}\r\n"
last = n
else:
if n >= last:
if speaker:
text += f"{speaker}\r\n"
speaker = ""
if n > last:
speaker = script
last = n
else:
text += f"{script}\r\n"
else:
if speaker:
if not self.is_person(speaker):
text += f"{speaker}\r\n{script}\r\n"
else:
if speaker[-1] == ":":
speaker = speaker[:-1]
text += f"[{speaker}] {script}\r\n"
speaker = ""
else:
text += f"{script}\r\n"
try:
script = float(script.strip())
last = minlines
except ValueError:
last = n
else:
text += "\r\n"
if speaker:
text += f"{speaker}\r\n"
if not re.findall(r"\[.+?\] .+?\r\n\r\n\[.+?\] .+?\r\n\r\n", text):
return ""
first_occurrence = re.findall(r"\[.+?\] ", text)[0]
if len(re.findall(re.escape(first_occurrence), text)) == 1:
text = re.sub(re.escape(first_occurrence), f"{first_occurrence[1:-2]}\r\n", text)
text = text.replace("&", "&")
text = "\r\n".join(text.splitlines())
text = re.sub(r"(\r*\n)", "\n", text)
text = re.sub(r"\n{2,}", "\n\n", text).strip()
return text
def _write(self, file: str, content: str) -> None:
path = os.path.join(self.folder, file) if self.folder is not None else file
with open(path, "w+", newline=None, encoding="utf-8") as f:
f.write(content)
def save(self, url: str) -> bool:
html = self.download(url)
if not html:
return False
script = self.parse(html)
if len(script) < 128:
return False
name = ".".join(url.split(".")[:-1]) if "." in url[-5:] else url
name = "".join([c for c in name.replace(" ", "_").replace("/", "-") if c.isalnum() or c in ("-", "_")])
self._write(f"{name}.txt", script)
return True
ic = IMSDbCrawler(FOLDER)
catalog = ic.get_catalog()
catalog.to_csv(STATUS, index=False)
catalog
# NOTE: this will take really long
catalog = pd.read_csv(STATUS)
crawled = catalog.copy()
for index, row in catalog.iterrows():
if pd.isna(row["status"]):
t = time.time()
print(f"{row['alpha']} {row['title']}", end=" ")
if ic.save(row["link"]):
print("✔️", end=" ")
crawled.at[index, "status"] = 1.0
else:
print("❌", end=" ")
crawled.at[index, "status"] = 0.0
print(f"- {(time.time() - t):.3f}s")
crawled.to_csv(STATUS, index=False)
if pd.notna(crawled["status"]).sum() % 25 == 0:
print(
f"▶▶▶ {pd.notna(crawled['status']).sum()} done ({int(crawled['status'].sum())} successful) out of {len(crawled)} ◀◀◀"
)
print("Done.")