docs/md_v2/advanced/multi-url-crawling.md
Heads Up: Crawl4AI supports advanced dispatchers for parallel or throttled crawling, providing dynamic rate limiting and memory usage checks. The built-in
arun_many()function uses these dispatchers to handle concurrency efficiently.
When crawling many URLs:
arun() in a loop (simple but less efficient)arun_many(), which efficiently handles multiple URLs with proper concurrency controlWhy Dispatchers?
class RateLimiter:
def __init__(
# Random delay range between requests
base_delay: Tuple[float, float] = (1.0, 3.0),
# Maximum backoff delay
max_delay: float = 60.0,
# Retries before giving up
max_retries: int = 3,
# Status codes triggering backoff
rate_limit_codes: List[int] = [429, 503]
)
Here’s the revised and simplified explanation of the RateLimiter, focusing on constructor parameters and adhering to your markdown style and mkDocs guidelines.
The RateLimiter is a utility that helps manage the pace of requests to avoid overloading servers or getting blocked due to rate limits. It operates internally to delay requests and handle retries but can be configured using its constructor parameters.
Parameters of the RateLimiter constructor:
1. base_delay (Tuple[float, float], default: (1.0, 3.0))
The range for a random delay (in seconds) between consecutive requests to the same domain.
base_delay[0] and base_delay[1] for each request.Example:
If base_delay = (2.0, 5.0), delays could be randomly chosen as 2.3s, 4.1s, etc.
2. max_delay (float, default: 60.0)
The maximum allowable delay when rate-limiting errors occur.
max_delay ensures the delay doesn’t grow unreasonably high, capping it at this value.Example:
For a max_delay = 30.0, even if backoff calculations suggest a delay of 45s, it will cap at 30s.
3. max_retries (int, default: 3)
The maximum number of retries for a request if rate-limiting errors occur.
RateLimiter retries the request up to this number of times.Example:
If max_retries = 3, the system retries a failed request three times before giving up.
4. rate_limit_codes (List[int], default: [429, 503])
A list of HTTP status codes that trigger the rate-limiting logic.
Example:
If rate_limit_codes = [429, 503, 504], the crawler will back off on these three error codes.
How to Use the RateLimiter:
Here’s an example of initializing and using a RateLimiter in your project:
from crawl4ai import RateLimiter
# Create a RateLimiter with custom settings
rate_limiter = RateLimiter(
base_delay=(2.0, 4.0), # Random delay between 2-4 seconds
max_delay=30.0, # Cap delay at 30 seconds
max_retries=5, # Retry up to 5 times on rate-limiting errors
rate_limit_codes=[429, 503] # Handle these HTTP status codes
)
# RateLimiter will handle delays and retries internally
# No additional setup is required for its operation
The RateLimiter integrates seamlessly with dispatchers like MemoryAdaptiveDispatcher and SemaphoreDispatcher, ensuring requests are paced correctly without user intervention. Its internal mechanisms manage delays and retries to avoid overwhelming servers while maximizing efficiency.
The CrawlerMonitor provides real-time visibility into crawling operations:
from crawl4ai import CrawlerMonitor, DisplayMode
monitor = CrawlerMonitor(
# Maximum rows in live display
max_visible_rows=15,
# DETAILED or AGGREGATED view
display_mode=DisplayMode.DETAILED
)
Display Modes:
Automatically manages concurrency based on system memory usage:
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=90.0, # Pause if memory exceeds this
check_interval=1.0, # How often to check memory
max_session_permit=10, # Maximum concurrent tasks
rate_limiter=RateLimiter( # Optional rate limiting
base_delay=(1.0, 2.0),
max_delay=30.0,
max_retries=2
),
monitor=CrawlerMonitor( # Optional monitoring
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
Constructor Parameters:
1. memory_threshold_percent (float, default: 90.0)
Specifies the memory usage threshold (as a percentage). If system memory usage exceeds this value, the dispatcher pauses crawling to prevent system overload.
2. check_interval (float, default: 1.0)
The interval (in seconds) at which the dispatcher checks system memory usage.
3. max_session_permit (int, default: 10)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4. memory_wait_timeout (float, default: 600.0)
Optional timeout (in seconds). If memory usage exceeds memory_threshold_percent for longer than this duration, a MemoryError is raised.
5. rate_limiter (RateLimiter, default: None)
Optional rate-limiting logic to avoid server-side blocking (e.g., for handling 429 or 503 errors). See RateLimiter for details.
6. monitor (CrawlerMonitor, default: None)
Optional monitoring for real-time task tracking and performance insights. See CrawlerMonitor for details.
Provides simple concurrency control with a fixed limit:
from crawl4ai.async_dispatcher import SemaphoreDispatcher
dispatcher = SemaphoreDispatcher(
max_session_permit=20, # Maximum concurrent tasks
rate_limiter=RateLimiter( # Optional rate limiting
base_delay=(0.5, 1.0),
max_delay=10.0
),
monitor=CrawlerMonitor( # Optional monitoring
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
Constructor Parameters:
1. max_session_permit (int, default: 20)
The maximum number of concurrent crawling tasks allowed, irrespective of semaphore slots.
2. rate_limiter (RateLimiter, default: None)
Optional rate-limiting logic to avoid overwhelming servers. See RateLimiter for details.
3. monitor (CrawlerMonitor, default: None)
Optional monitoring for tracking task progress and resource usage. See CrawlerMonitor for details.
async def crawl_batch():
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=False # Default: get all results at once
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10,
monitor=CrawlerMonitor(
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Get all results at once
results = await crawler.arun_many(
urls=urls,
config=run_config,
dispatcher=dispatcher
)
# Process all results after completion
for result in results:
if result.success:
await process_result(result)
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
Review:
MemoryAdaptiveDispatcher to manage concurrency and system memory.stream=False), so all results are collected at once for post-processing.async def crawl_streaming():
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=True # Enable streaming mode
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10,
monitor=CrawlerMonitor(
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Process results as they become available
async for result in await crawler.arun_many(
urls=urls,
config=run_config,
dispatcher=dispatcher
):
if result.success:
# Process each result immediately
await process_result(result)
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
Review:
MemoryAdaptiveDispatcher for concurrency and memory management.stream=True), allowing real-time processing during crawling.async def crawl_with_semaphore(urls):
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
dispatcher = SemaphoreDispatcher(
semaphore_count=5,
rate_limiter=RateLimiter(
base_delay=(0.5, 1.0),
max_delay=10.0
),
monitor=CrawlerMonitor(
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls,
config=run_config,
dispatcher=dispatcher
)
return results
Review:
SemaphoreDispatcher to limit concurrency with a fixed number of slots.import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode
async def main():
urls = [
"https://example1.com",
"https://example2.com",
"https://example3.com"
]
config = CrawlerRunConfig(
cache_mode=CacheMode.ENABLED,
check_robots_txt=True, # Will respect robots.txt for each URL
semaphore_count=3 # Max concurrent requests
)
async with AsyncWebCrawler() as crawler:
async for result in crawler.arun_many(urls, config=config):
if result.success:
print(f"Successfully crawled {result.url}")
elif result.status_code == 403 and "robots.txt" in result.error_message:
print(f"Skipped {result.url} - blocked by robots.txt")
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
Review:
robots.txt rules for ethical and legal web crawling.check_robots_txt=True to validate each URL against robots.txt before crawling.semaphore_count=3).Each crawl result includes dispatch information:
@dataclass
class DispatchResult:
task_id: str
memory_usage: float
peak_memory: float
start_time: datetime
end_time: datetime
error_message: str = ""
Access via result.dispatch_result:
for result in results:
if result.success:
dr = result.dispatch_result
print(f"URL: {result.url}")
print(f"Memory: {dr.memory_usage:.1f}MB")
print(f"Duration: {dr.end_time - dr.start_time}")
When crawling diverse content types, you often need different configurations for different URLs. For example:
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, MatchMode
from crawl4ai.processors.pdf import PDFContentScrapingStrategy
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def crawl_mixed_content():
# Configure different strategies for different content
configs = [
# PDF files - specialized extraction
CrawlerRunConfig(
url_matcher="*.pdf",
scraping_strategy=PDFContentScrapingStrategy()
),
# Blog/article pages - content filtering
CrawlerRunConfig(
url_matcher=["*/blog/*", "*/article/*"],
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
)
),
# Dynamic pages - JavaScript execution
CrawlerRunConfig(
url_matcher=lambda url: 'github.com' in url,
js_code="window.scrollTo(0, 500);"
),
# API endpoints - JSON extraction
CrawlerRunConfig(
url_matcher=lambda url: 'api' in url or url.endswith('.json'),
# Custome settings for JSON extraction
),
# Default config for everything else
CrawlerRunConfig() # No url_matcher means it matches ALL URLs (fallback)
]
# Mixed URLs
urls = [
"https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf",
"https://blog.python.org/",
"https://github.com/microsoft/playwright",
"https://httpbin.org/json",
"https://example.com/"
]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls=urls,
config=configs # Pass list of configs
)
for result in results:
print(f"{result.url}: {len(result.markdown)} chars")
Important: A CrawlerRunConfig without url_matcher (or with url_matcher=None) matches ALL URLs. This makes it perfect as a default/fallback configuration.
The url_matcher parameter supports three types of patterns:
# Simple patterns
"*.pdf" # Any PDF file
"*/api/*" # Any URL with /api/ in path
"https://*.example.com/*" # Subdomain matching
"*://example.com/blog/*" # Any protocol
# Complex logic with lambdas
lambda url: url.startswith('https://') and 'secure' in url
lambda url: len(url) > 50 and url.count('/') > 5
lambda url: any(domain in url for domain in ['api.', 'data.', 'feed.'])
# Combine multiple conditions
CrawlerRunConfig(
url_matcher=[
"https://*", # Must be HTTPS
lambda url: 'internal' in url, # Must contain 'internal'
lambda url: not url.endswith('.pdf') # Must not be PDF
],
match_mode=MatchMode.AND # ALL conditions must match
)
async def crawl_news_site():
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
rate_limiter=RateLimiter(base_delay=(1.0, 2.0))
)
configs = [
# Homepage - light extraction
CrawlerRunConfig(
url_matcher=lambda url: url.rstrip('/') == 'https://news.ycombinator.com',
css_selector="nav, .headline",
extraction_strategy=None
),
# Article pages - full extraction
CrawlerRunConfig(
url_matcher="*/article/*",
extraction_strategy=CosineStrategy(
semantic_filter="article content",
word_count_threshold=100
),
screenshot=True,
excluded_tags=["nav", "aside", "footer"]
),
# Author pages - metadata focus
CrawlerRunConfig(
url_matcher="*/author/*",
extraction_strategy=JsonCssExtractionStrategy({
"name": "h1.author-name",
"bio": ".author-bio",
"articles": "article.post-card h2"
})
),
# Everything else
CrawlerRunConfig()
]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls=news_urls,
config=configs,
dispatcher=dispatcher
)
url_matcher matches ALL URLsis_match() method to test patterns:
config = CrawlerRunConfig(url_matcher="*.pdf")
print(config.is_match("https://example.com/doc.pdf")) # True
default_config = CrawlerRunConfig() # No url_matcher
print(default_config.is_match("https://any-url.com")) # True - matches everything!
1. Two Dispatcher Types:
2. Optional Components:
3. Key Benefits:
Choose the dispatcher that best fits your needs: