tutorials/minhash_dedupe/minhash_dedupe_common_crawl.ipynb
In this notebook we will be performing the MinHash Deduplication algorithm over extracted text from html documents in the common crawl dataset. The Common Crawl corpus contains petabytes of data, with its oldest entries dating back to 2008. Each dataset includes raw web page data, metadata extracts, and text extracts. Deduplication is a helpful top-of-funnel strategy for improving dataset quality and is commonly used to improve generalization in LLM training, RAG, and Search.
When implemented as a pipeline, this workload can process 100,000 web pages in under 4 minutes on a MacBook Air (M2) with 8 GB of RAM. That includes preprocessing pages into html blocks, minhash, lsh banding, connected components, and final dedupe!
# of documents loaded: 100000
# of text rows before: 4944669
# of text rows after: 1855814
% of text rows kept: 37.53%
Overall Time: 222.21s
# Install additional dependencies
# For Google Colab or standalone environments:
%pip install 'daft[aws,pandas]' selectolax scipy matplotlib igraph ipywidgets
# For local development with uv (temporary installation):
# uv pip install 'daft[aws,pandas]' selectolax scipy matplotlib igraph ipywidgets
# Pipeline
NUM_ROWS = 500
index_col = "block_id"
content_col = "block"
# Minhash
K = 64 # Number of Permutations
SEED = 42 # Seed for the hash function
NGRAM_SIZE = 5 # Size of the n-grams
LSH_THRESHOLD = 0.7 # Jaccard Similarity Threshold for LSH
We will be accessing Common Crawl through WARC files since daft supports the format natively.
Crawl data is free to access by anyone from anywhere. The data is hosted by Amazon Web Services’ Open Data Sets Sponsorships program on the bucket s3://commoncrawl/, located in the US-East-1 (Northern Virginia) AWS Region. The most performative means of accessing Common crawl is through s3, so if you plan to process a lot of data you'll want to authenticate with a AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
However, Common Crawl data can also be accessed without authentication, anonymously via it's http endpoint.
import os
from IPython.display import clear_output
import daft
from daft.io import IOConfig, S3Config
IN_AWS = False
if os.environ.get("AWS_ACCESS_KEY_ID"):
# Make sure to define your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in your environment variables or in a .env file
s3_config = S3Config(
region_name="us-east-1",
requester_pays=True,
key_id=os.environ["AWS_ACCESS_KEY_ID"],
access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
anonymous=False,
)
IN_AWS = True
IO_CONFIG = IOConfig(s3=s3_config)
daft.set_planning_config(default_io_config=IO_CONFIG)
# Read the WARC files from the Common Crawl S3 bucket or HTTP endpoint
df_warc = daft.datasets.common_crawl("CC-MAIN-2025-33", in_aws=IN_AWS).limit(NUM_ROWS).collect()
# Collection can take ~1m 30s for non-AWS access
df_warc.show(3)
# Lets investigate the different types of payloads we have:
df_warc.select("WARC-Identified-Payload-Type").distinct().show()
Since we are primarily concerned with text, we will focus on text/html payloads, extracting text content from html body and normalizing the text itself.
from daft import col
# Define a UDF to remove http headers from the payload
@daft.func()
def remove_http_headers(x: str) -> str:
"""Remove HTTP headers from input string by splitting on double CRLF, returning the body or empty string."""
if x is None:
return ""
if len(x.split("\r\n\r\n")) > 1:
return x.split("\r\n\r\n")[1]
return ""
# Filter the dataframe to only include text/html payloads
df_html = df_warc.where(col("WARC-Identified-Payload-Type") == "text/html")
# Create a Daft Expression to decode WARC content from bytes to UTF-8 and strip HTTP headers
# Learn more about Daft Expressions: https://docs.daft.ai/en/stable/api/expressions/
warc_without_headers_expr = remove_http_headers(col("warc_content").try_decode("utf-8"))
# Separate the http headers from the payloads and filter out empty content
df_html = df_html.with_column("content_raw", warc_without_headers_expr).where(col("content_raw") != "")
from selectolax.parser import HTMLParser
from daft import DataType as dt
# Define a UDF to extract text from HTML content
@daft.func()
def extract_blocks(html: str) -> dt.list(dt.struct({"txt": dt.string(), "tag": dt.string()})):
"""Parse HTML using Selectolax, remove scripts/styles/noscripts, extract text blocks from key tags."""
tree = HTMLParser(html)
for n in tree.css("script,style,noscript"):
n.decompose()
all_nodes = tree.css(
"""title, article, main, p, h1, h2, h3, h4, h5, h6, li, div, section, img[alt], figcaption, caption, blockquote, table th, table td, pre, code, summary, meta[name="description"], meta[property="og:title"], meta[property="og:description"]"""
)
blocks = [
{"txt": node.text(separator=" ", strip=True), "tag": node.tag}
for node in all_nodes
if node.text(separator=" ", strip=True)
]
return blocks
df_text = (
df_html.with_column("blocks", extract_blocks(col("content_raw")))
.explode("blocks")
.where(col("blocks")["txt"] != "")
.where(col("blocks")["txt"].not_null())
.with_column(
index_col, col("WARC-Record-ID") + "-" + col("blocks")["tag"] + "-" + col("blocks")["txt"].hash()
) # Record ID + Tag + Text Hash
.with_column(content_col, col("blocks")["txt"])
)
df_text = df_text.collect()
df_text.show(3)
# Drop Un-needed Columns
df_ready = df_text.select(index_col, content_col)
So far we have extracted the text out of each html document into blocks. Now we move to normalize the text blocks to prepare for the MinHash operation.
Note: It is recommended to run your preprocessing pipeline separately from your minhash deduplication workload.
See docs: normalize
# Normalize text
df_norm = df_ready.with_column(
"content_normalized",
col(content_col).normalize(remove_punct=True, lowercase=True, nfd_unicode=True, white_space=True),
)
df_norm.select(index_col, content_col, "content_normalized").show(3)
Normally when you perform a minhash on text data, you have to define the shingling strategy, hash functions, and permutation parameters manually.
Luckily, daft has a built in minhash expression.
# Calculate the minhash vectors
df_minhash = df_norm.with_column(
"min_hashes",
col("content_normalized").minhash(num_hashes=K, ngram_size=NGRAM_SIZE, seed=SEED, hash_function="xxhash"),
)
df_minhash.select(index_col, content_col, "min_hashes").show(3)
LSH Banding involves splitting each document's MinHash signature into bands and rows, where documents with identical bands are considered candidate pairs for similarity comparison. This technique dramatically reduces the number of comparisons needed by only comparing documents that share at least one identical band, making near-duplicate detection scalable for large datasets
Next, we will:
B bands of R rows eachfrom scipy.integrate import quad as integrate
def optimal_param(
threshold: float,
num_perm: int,
false_positive_weight: float = 0.5,
false_negative_weight: float = 0.5,
):
"""Compute the optimal `MinHashLSH` parameter that minimizes the weighted sum of probabilities of false positive and false negative, taken from datasketch.
Parameters
----------
threshold : float
The threshold for similarity.
num_perm : int
The number of permutations.
false_positive_weight : float
The weight of false positive.
false_negative_weight : float
The weight of false negative.
Returns:
-------
Tuple[int, int]
The optimal `b` and `r` parameters.
The number of bands, and the number of rows per band respectively.
Examples:
--------
>>> optimal_param(0.7, 256)
(25, 10)
"""
def false_positive_area(threshold: float, b: int, r: int):
"""Source: `datasketch.lsh`."""
def area(s):
return 1 - (1 - s ** float(r)) ** float(b)
a, _ = integrate(area, 0.0, threshold)
return a
def false_negative_area(threshold: float, b: int, r: int):
"""Source: `datasketch.lsh`."""
def area(s):
return 1 - (1 - (1 - s ** float(r)) ** float(b))
a, _ = integrate(area, threshold, 1.0)
return a
min_error = float("inf")
opt = (0, 0)
for b in range(1, num_perm + 1):
max_r = int(num_perm / b)
for r in range(1, max_r + 1):
fp = false_positive_area(threshold, b, r)
fn = false_negative_area(threshold, b, r)
error = fp * false_positive_weight + fn * false_negative_weight
if error < min_error:
min_error = error
opt = (b, r)
return opt
# Choose B bands and R rows per band such that B · R = num_perm.
B, R = optimal_param(LSH_THRESHOLD, K) # Default 0.7 , 64
print(B, R, K)
# Verify that B * R = K
assert B * R == K
Before we move to Band Generation, we need to hash our index_col to int to make downstream processing easier.
We will keep track of the map and introduce a new column with a monotonically increasing id.
from daft.functions import monotonically_increasing_id
df_minhash = df_minhash.with_column("node_id", monotonically_increasing_id())
id_map = df_minhash.select(index_col, "node_id").distinct()
Previously we calculated the minhashes for our content_text where we hashed each word token into an 8 byte integer, taking only 64 samples (at a uniform random sample).
Next we took those 64 hashes and chunked them into 8 lists of 8 values.
# Band Generation
df_bands = df_minhash.with_column("bands", col("min_hashes").list.chunk(R))
df_bands.select(index_col, content_col, "min_hashes", "bands").show(3)
Now we will explode our bands into new rows, keeping track of their position in the band using band_idx.
@daft.func()
def get_band_idx(band: list[int], B: int) -> list[int]:
return list(range(min(len(band), B)))
df_bands_exploded = df_bands.with_column("band_idx", get_band_idx(col("bands"), B)).explode("bands", "band_idx")
df_bands_exploded.select("node_id", "band_idx", "bands").show(3)
We then group the bands against their 'signature', which is a combination of their band index and the band itself. If two segments are duplicates, we expect their signatures to match.
# Grouping Bands
df_grouped = df_bands_exploded.groupby(col("band_idx"), col("bands")).agg(col("node_id").agg_list().alias("nodes"))
df_grouped.select("band_idx", "bands", "nodes").show(3)
# Inspecting bands with multiple nodes
df_grouped.where(col("nodes").list.length() > 1).select("band_idx", "bands", "nodes").show(3)
Every band whose nodes have more than one entry are now candidates for consideration. But there is something wrong... Our nodes are repeated across different band indices!
In order to reduce our candidates into their unique set, we leverage a few tricks from graph theory to isolate the duplicates. Here we get to implement one the most important algorithms in distributed computing. Connected Components in MapReduce and Beyond is a seminal paper from 2014 written by researchers at Google.
We’ll follow the paper’s star‑contraction recipe: alternate a Large‑star and Small‑star pass that repeatedly points each node to the smallest ID in its neighborhood. After a few rounds the edge set stabilizes; the “parent” each node points to is its component representative.
Concretely, we’ll collapse band groups into a simple graph:
From there we use star-contraction (Kiveris et al., 2014) to snap clusters together:
Repeat Large-star then Small-star until the edge set stops changing. The final “parent” each node points to is its component representative (typically the component’s minimum ID). It’s fast, scalable, and after a handful of rounds, the clusters just fall out!
# First we must convert our list of nodes into an edge list
df_edges = (
df_grouped.with_column("u", col("nodes").list.min())
.explode("nodes")
.select("u", v=col("nodes"))
.where(col("u") != col("v"))
.where(~col("u").is_null())
.where(~col("v").is_null())
.distinct()
)
df_edges.show(5)
from daft import DataFrame, Expression, struct
from daft.functions import when
def ee(u: Expression, v: Expression):
"""Create a struct Expression with fields 'u' and 'v' for representing edges."""
return struct(u.alias("u"), v.alias("v"))
def canonicalize(edges: DataFrame) -> DataFrame:
"""Order edges so u < v and deduplicate for canonical representation."""
return (
edges.with_column("u_can", when(col("u") < col("v"), col("u")).otherwise(col("v")))
.with_column("v_can", when(col("u") < col("v"), col("v")).otherwise(col("u")))
.select(col("u_can").alias("u"), col("v_can").alias("v"))
.distinct()
)
def symmetrize(edges: DataFrame) -> DataFrame:
"""Make edge list undirected by adding reverse edges."""
return edges.select("u", "v").union_all(edges.select(col("v").alias("u"), col("u").alias("v"))).collect()
def pairs_equal(a: DataFrame, b: DataFrame) -> bool:
"""Check if two DataFrames have identical (u, rep) pairs via anti-joins."""
left_minus = a.join(b, on=["u", "rep"], how="anti").count_rows()
right_minus = b.join(a, on=["u", "rep"], how="anti").count_rows()
return (left_minus == 0) and (right_minus == 0)
We will iteratively compress the graph using two alternating phases until convergence:
def large_star(edges: DataFrame) -> DataFrame:
"""Perform large-star operation: connect nodes to min in extended neighborhood."""
# 1. Emit U,V and V,U
undirected = edges.select("u", "v").union_all(edges.select(col("v").alias("u"), col("u").alias("v"))).collect()
# Step 2: Group by u, and aggregate the list of v's
neigh = undirected.groupby("u").agg_list("v").with_column("nbrs", col("v"))
# Step 3: Compute m = min over nbrs union {u}
neigh = neigh.with_column("m", col("nbrs").list.min())
neigh = neigh.with_column(
"m", when(col("m").is_null(), col("u")).otherwise(when(col("u") < col("m"), col("u")).otherwise(col("m")))
)
# Step 4: Emit (v, m(u)) for v > u
out = (
neigh.explode("nbrs")
.where(col("nbrs") > col("u"))
.select(col("nbrs").alias("u"), col("m").alias("v"))
.where(col("u") != col("v"))
.distinct()
.collect()
)
return out
def small_star(edges: DataFrame) -> DataFrame:
"""Perform small-star operation: connect to min in direct smaller neighborhood."""
# Step 1: For each edge, emit to the larger node as key, smaller as value
directed = (
edges.select(when(col("u") < col("v"), ee(col("u"), col("v"))).otherwise(ee(col("v"), col("u"))).alias("e"))
.select(col("e")["*"])
.where(col("u") != col("v"))
.distinct()
)
# Step 2: Group by larger u, nbrs are smaller neighbors
neigh = directed.groupby("u").agg_list("v").with_column("nbrs", col("v"))
# Step 3: Compute m = min over nbrs union {u}
neigh = neigh.with_column("m", col("nbrs").list.min())
neigh = neigh.with_column(
"m", when(col("m").is_null(), col("u")).otherwise(when(col("u") < col("m"), col("u")).otherwise(col("m")))
)
# Emit (v, m(u)) for all v in N(u)
out = (
neigh.explode("nbrs")
.select(col("nbrs").alias("u"), col("m").alias("v"))
.where(col("u") != col("v"))
.distinct()
.collect()
)
return out
def check_canonical_set_equality(prev_edges: DataFrame, curr_edges: DataFrame) -> bool:
"""Check if two edge DataFrames represent the same set after canonicalization."""
prev_can = canonicalize(prev_edges).collect().to_pydict()
curr_can = canonicalize(curr_edges).collect().to_pydict()
prev_set = set(zip(prev_can["u"], prev_can["v"]))
curr_set = set(zip(curr_can["u"], curr_can["v"]))
return prev_set == curr_set
# The Alternating Algorithm
b = df_edges
while True:
a = large_star(b)
b_next = small_star(a)
if check_canonical_set_equality(b, b_next):
b = b_next
break
b = b_next
b_final = b
clear_output() # cleans up the cell output for this operation
After the alternating star operations converge, we have a stable edge list that implicitly defines connected components.
The final step is to turn this edge list into an explicit assignment table:
[node_id → component_representative].
We do this in three small, deterministic steps:
["u", "rep"], where
u is the original node ID,rep is the globally smallest node in its component (the canonical representative).This table is what we use to filter duplicates: keep only the row whose index equals its rep, discarding the rest.
# Build the set of all unique node IDs that appear in the edge list
# (both as source 'u' and destination 'v')
nodes = (
b.select(col("u").alias("u")) # grab all source nodes
.union_all(b.select(col("v").alias("u"))) # grab all destination nodes
.distinct() # deduplicate to get unique nodes
)
# For every node, compute the smallest node ID it is connected to
# (i.e., its tentative representative / root in the current component)
rep_map = (
b.groupby("u").agg(col("v").min().alias("rep")) # group edges by source node # find the smallest neighbor
)
# Join each node with its tentative representative.
# Nodes that have no outgoing edges (and thus no entry in rep_map)
# become their own representative.
assignments = (
nodes.join(rep_map, on="u", how="left") # left join to keep all nodes
.with_column(
"rep",
when(col("rep").is_null(), col("u")).otherwise(
col("rep")
), # if no neighbor was found, use the node itself as rep
)
.select("u", "rep") # keep only node and its rep
.distinct() # deduplicate any duplicates
.collect() # materialize the result
)
assignments.show()
igraph is a high-performance graph analysis library that provides robust implementations of fundamental graph algorithms. We use it here as our ground truth for connected component detection because:
See docs: Connected Components
import igraph as ig
import pandas as pd
# Ensure integer dtype and materialize edges
pdf_edges = (
df_edges.select(col("u").cast(daft.DataType.int64()), col("v").cast(daft.DataType.int64()))
.where(~col("u").is_null())
.where(~col("v").is_null())
.to_pandas()
)
# Build explicit vertex list and index mapping to avoid dtype/label ambiguity
unique_nodes = pd.unique(pd.concat([pdf_edges["u"], pdf_edges["v"]], ignore_index=True))
# Convert to Python ints for stable hashing
node_ids = [int(x) for x in unique_nodes.tolist()]
id_to_idx = {nid: idx for idx, nid in enumerate(node_ids)}
# Map edges to contiguous indices
edges_idx = [(id_to_idx[int(u)], id_to_idx[int(v)]) for u, v in zip(pdf_edges["u"], pdf_edges["v"])]
g = ig.Graph(n=len(node_ids), edges=edges_idx, directed=False)
comps = g.connected_components(mode="weak")
clear_output()
# We can inspect the components to see how many there are and what they look like
print(comps)
import matplotlib.pyplot as plt
# Just grab the top 10 most connected nodes and their neighbors
# Get component sizes and sort by number of nodes
comp_sizes = [(i, len(comp)) for i, comp in enumerate(comps)]
top_comp_indices = [i for i, _ in sorted(comp_sizes, key=lambda x: x[1], reverse=True)[:10]]
# Get all nodes from these top components
top_nodes = set()
for comp_idx in top_comp_indices:
comp_nodes = comps[comp_idx]
top_nodes.update(comp_nodes)
# Create subgraph from these nodes
subgraph = g.subgraph(list(top_nodes))
fig, ax = plt.subplots(figsize=(10, 10))
layout = subgraph.layout_fruchterman_reingold()
ig.plot(
subgraph,
target=ax,
layout=layout,
vertex_size=5,
palette=ig.RainbowPalette(),
vertex_color=list(map(int, ig.rescale(comps.membership, (0, 250)))),
edge_width=0.5,
bbox=(1000, 1000), # bigger drawing box
margin=40,
autocurve=True, # curves to reduce edge overlap
)
ours_grouped = assignments.groupby("rep").agg(col("u").agg_list().alias("members")).collect()
pdf = ours_grouped.to_pandas()
ours_comps = {frozenset(m) for m in pdf["members"]}
ig_comps = {frozenset(node_ids[i] for i in comp) for comp in comps}
if ours_comps == ig_comps:
print(f"[VALIDATION] PASSED: components match igraph (n={len(ours_comps)})")
else:
only_ours = ours_comps - ig_comps
only_ig = ig_comps - ours_comps
def _preview(sets, k=3):
out = []
for comp in list(sets)[:k]:
out.append(sorted(list(int(c) for c in comp))[:10])
return out
print(f"[VALIDATION] MISMATCH: ours={len(ours_comps)} vs igraph={len(ig_comps)}")
print(f" examples only in ours: {_preview(only_ours)}")
print(f" examples only in igraph: {_preview(only_ig)}")
Why this is needed:
Algorithm:
# Build an undirected view of the graph so labels can flow in both directions
E = symmetrize(b_final)
# Initialize labels from current assignments: rep becomes the working label per node
labels = assignments.select(col("u"), col("rep").alias("label")).collect()
lp_iters = 0
lp_max_iters = 100
while lp_iters < lp_max_iters:
lp_iters += 1
# For each node u, compute the minimum label among its neighbors
nbr_min = (
E.join(labels, left_on="v", right_on="u", how="left")
.select(col("u").alias("node"), col("label"))
.groupby("node")
.agg(col("label").min().alias("nbr_min"))
.collect()
)
# Lower each node's label to min(current_label, neighbor_min_label)
labels_next = (
labels.join(nbr_min, left_on="u", right_on="node", how="left")
.with_column(
"label",
when(col("nbr_min").is_null(), col("label")).otherwise(
when(col("label") <= col("nbr_min"), col("label")).otherwise(col("nbr_min"))
),
)
.select(col("u"), col("label"))
.distinct()
.collect()
)
# Convergence: compare pair sets after casting back to (u, rep)
a = assignments.select(col("u"), col("rep").alias("label")).select(col("u"), col("label").alias("rep"))
b = labels_next.select(col("u"), col("label").alias("rep"))
if pairs_equal(a, b):
break
# Continue iterating with updated assignments/labels
assignments = labels_next.select(col("u"), col("label").alias("rep")).collect()
labels = labels_next
assignments_globally_reduced = assignments
clear_output()
Checking one more time
ours_grouped = assignments_globally_reduced.groupby("rep").agg(col("u").agg_list().alias("members")).collect()
pdf = ours_grouped.to_pandas()
ours_comps = {frozenset(m) for m in pdf["members"]}
ig_comps = {frozenset(node_ids[i] for i in comp) for comp in comps}
if ours_comps == ig_comps:
print(f"[VALIDATION] PASSED: components match igraph (n={len(ours_comps)})")
else:
only_ours = ours_comps - ig_comps
only_ig = ig_comps - ours_comps
def _preview(sets, k=3):
out = []
for comp in list(sets)[:k]:
out.append(sorted(list(int(c) for c in comp))[:10])
return out
print(f"[VALIDATION] MISMATCH: ours={len(ours_comps)} vs igraph={len(ig_comps)}")
print(f" examples only in ours: {_preview(only_ours)}")
print(f" examples only in igraph: {_preview(only_ig)}")
# First, create a mapping from node IDs to their string representatives
assignments_unique = assignments_globally_reduced.groupby("u").agg(col("rep").min()).collect()
# Join the assignments with the original ID mapping to get string representatives
a1 = assignments_unique.join(id_map.with_column_renamed(index_col, "__u_str"), left_on="u", right_on="node_id")
a2 = a1.join(id_map.with_column_renamed(index_col, "__rep_str"), left_on="rep", right_on="node_id")
assignments_unique_str = a2.select(col("__u_str").alias(index_col), col("__rep_str").alias("component"))
assignments_unique_str.show()
# Filter to columns of interest
df_content = df_text.select("WARC-Record-ID", index_col, "block")
# Join back to original df and filter to keep only rows where the row is its own representative or isolated
df_joined = df_content.join(assignments_unique_str, on=index_col, how="left").collect()
# Return the deduplicated dataset with only unique representatives
deduplicated_df = (
df_joined.filter(col("component").is_null() | (col("component") == col(index_col))).exclude("component")
).collect()
deduplicated_df.show()
# Create a dataframe of the duplicates by filtering for rows that are NOT their own representative
duplicates_df = (
df_joined.filter(col("component").not_null() & (col("component") != col(index_col)))
.where(col("block") != " ")
.exclude("component")
).collect()
print(f"Found {len(duplicates_df)} duplicate blocks")
print("\nSample of duplicates:")
duplicates_df.show(10)
In this notebook, we built an end‑to‑end, scalable deduplication pipeline for web text:
Why this is helpful/important
Where to take it next
Outputs to expect
deduplicated_df) ready for downstream trainingduplicates_df sample to spot‑check clusters and validate quality