How to Build a Web Scraping Pipeline for AI Training Data
Prerequisites
- • Python 3.10+ installed
- • Experience with web scraping fundamentals (HTTP, HTML parsing)
- • Familiarity with data processing pipelines
- • Basic understanding of machine learning data requirements
- • Access to cloud storage (S3, GCS, or equivalent)
Define Your Data Requirements
from dataclasses import dataclass, field
from enum import Enum
class ContentType(Enum):
ARTICLE = "article"
DOCUMENTATION = "documentation"
FORUM = "forum"
CODE = "code"
ACADEMIC = "academic"
NEWS = "news"
@dataclass
class DataRequirements:
"""Define what your training data pipeline should produce."""
target_content_types: list[ContentType]
languages: list[str] = field(default_factory=lambda: ["en"])
min_content_length: int = 500 # characters
max_content_length: int = 200_000
min_quality_score: float = 0.6 # 0-1 scale
dedup_strategy: str = "minhash" # exact, minhash, simhash
target_size_gb: float = 100.0
freshness_cutoff: str = "2024-01-01" # ignore content before this date
excluded_domains: list[str] = field(default_factory=list)
required_metadata: list[str] = field(
default_factory=lambda: ["url", "title", "content", "language", "scraped_at"]
)
# Example: fine-tuning data for a technical writing model
tech_writing_reqs = DataRequirements(
target_content_types=[ContentType.DOCUMENTATION, ContentType.ARTICLE],
languages=["en"],
min_content_length=1000,
min_quality_score=0.75,
target_size_gb=50.0,
excluded_domains=["pinterest.com", "quora.com"], # Low signal-to-noise
)Tip: Start with a 1% sample of your target crawl. Process it through your entire pipeline before scaling. Issues in data quality, deduplication, or parsing are much cheaper to fix at small scale.
Select and Prioritize Sources
from dataclasses import dataclass
import json
@dataclass
class CrawlSource:
domain: str
content_type: ContentType
priority: int # 1 = highest
estimated_pages: int
requires_auth: bool = False
rate_limit_rps: float = 1.0 # requests per second
robots_txt_compliant: bool = True
notes: str = ""
# Build a prioritized source list
sources = [
CrawlSource(
domain="docs.python.org",
content_type=ContentType.DOCUMENTATION,
priority=1,
estimated_pages=15_000,
rate_limit_rps=2.0,
),
CrawlSource(
domain="developer.mozilla.org",
content_type=ContentType.DOCUMENTATION,
priority=1,
estimated_pages=50_000,
rate_limit_rps=2.0,
),
CrawlSource(
domain="arxiv.org",
content_type=ContentType.ACADEMIC,
priority=2,
estimated_pages=2_000_000,
rate_limit_rps=0.5,
notes="Use bulk access endpoint for papers, respect rate limits",
),
CrawlSource(
domain="stackoverflow.com",
content_type=ContentType.FORUM,
priority=2,
estimated_pages=50_000_000,
rate_limit_rps=1.0,
notes="Use data dump instead of crawling — available under CC-BY-SA",
),
]
def save_source_manifest(sources: list[CrawlSource], filepath: str):
"""Save source list for reproducibility."""
manifest = {
"created_at": "2026-03-26",
"total_sources": len(sources),
"estimated_total_pages": sum(s.estimated_pages for s in sources),
"sources": [
{
"domain": s.domain,
"content_type": s.content_type.value,
"priority": s.priority,
"estimated_pages": s.estimated_pages,
"rate_limit_rps": s.rate_limit_rps,
}
for s in sources
],
}
with open(filepath, "w") as f:
json.dump(manifest, f, indent=2)Tip: Check if a data dump already exists before crawling. Wikipedia, Stack Overflow, Common Crawl, and many government sites offer bulk downloads that are faster, cheaper, and more respectful than crawling.
Build the Crawler
import asyncio
import time
import hashlib
import json
from pathlib import Path
from urllib.parse import urljoin, urlparse
from curl_cffi import requests
from selectolax.parser import HTMLParser
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class CrawlResult:
url: str
status_code: int
content: str
title: str
language: str
content_length: int
scraped_at: str
content_hash: str
outlinks: list[str] = field(default_factory=list)
class WebCrawler:
def __init__(
self,
max_concurrent: int = 10,
default_rate_limit: float = 1.0,
output_dir: str = "./crawl_output",
proxy: str | None = None,
):
self.max_concurrent = max_concurrent
self.default_rate_limit = default_rate_limit
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.proxy = proxy
self.visited: set[str] = set()
self.domain_last_request: dict[str, float] = {}
self.domain_rate_limits: dict[str, float] = {}
self.semaphore = asyncio.Semaphore(max_concurrent)
def set_rate_limit(self, domain: str, rps: float):
self.domain_rate_limits[domain] = rps
async def _wait_for_rate_limit(self, domain: str):
rate_limit = self.domain_rate_limits.get(domain, self.default_rate_limit)
min_interval = 1.0 / rate_limit
last_request = self.domain_last_request.get(domain, 0)
elapsed = time.time() - last_request
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self.domain_last_request[domain] = time.time()
def _extract_content(self, html: str, url: str) -> CrawlResult:
tree = HTMLParser(html)
# Remove non-content elements
for tag in tree.css("script, style, nav, footer, header, aside, .sidebar, .ad"):
tag.decompose()
# Extract main content
main = tree.css_first("main, article, .content, #content, .post-content")
content_node = main if main else tree.css_first("body")
content = content_node.text(separator="\n", strip=True) if content_node else ""
# Extract title
title_el = tree.css_first("title")
title = title_el.text(strip=True) if title_el else ""
# Detect language
html_el = tree.css_first("html")
lang = html_el.attributes.get("lang", "en")[:2] if html_el else "en"
# Extract outbound links
outlinks = []
for link in tree.css("a[href]"):
href = link.attributes.get("href", "")
if href.startswith("http"):
outlinks.append(href)
elif href.startswith("/"):
outlinks.append(urljoin(url, href))
content_hash = hashlib.sha256(content.encode()).hexdigest()
return CrawlResult(
url=url,
status_code=200,
content=content,
title=title,
language=lang,
content_length=len(content),
scraped_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
content_hash=content_hash,
outlinks=outlinks,
)
async def fetch(self, url: str) -> CrawlResult | None:
domain = urlparse(url).netloc
async with self.semaphore:
await self._wait_for_rate_limit(domain)
try:
session = requests.Session(impersonate="chrome")
proxies = {"https": self.proxy, "http": self.proxy} if self.proxy else None
response = session.get(url, proxies=proxies, timeout=15)
if response.status_code != 200:
return None
return self._extract_content(response.text, url)
except Exception as e:
print(f"Failed to fetch {url}: {e}")
return NoneTip: For large-scale crawls (millions of pages), consider using a distributed task queue like Celery or a dedicated crawling framework like Scrapy. The async approach shown here works well up to ~100K pages.
Clean and Deduplicate Content
import re
import hashlib
from datasketch import MinHash, MinHashLSH
class ContentCleaner:
"""Clean and normalize crawled text content."""
# Patterns to remove
BOILERPLATE_PATTERNS = [
r"Cookie\s+(P|p)olicy.*?\n",
r"Subscribe\s+to\s+our\s+newsletter.*?\n",
r"Share\s+on\s+(Facebook|Twitter|LinkedIn).*?\n",
r"Copyright\s+©.*?\n",
r"All\s+rights\s+reserved.*?\n",
r"Terms\s+of\s+(Service|Use).*?\n",
r"Privacy\s+Policy.*?\n",
]
def clean(self, text: str) -> str:
# Remove boilerplate patterns
for pattern in self.BOILERPLATE_PATTERNS:
text = re.sub(pattern, "", text, flags=re.IGNORECASE)
# Normalize whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+", " ", text)
# Remove lines that are just URLs
lines = text.split("\n")
lines = [l for l in lines if not re.match(r"^https?://\S+$", l.strip())]
# Remove very short lines (likely navigation remnants)
lines = [l for l in lines if len(l.strip()) > 20 or l.strip() == ""]
return "\n".join(lines).strip()
class Deduplicator:
"""Remove exact and near-duplicate documents using MinHash LSH."""
def __init__(self, threshold: float = 0.8, num_perm: int = 128):
self.threshold = threshold
self.num_perm = num_perm
self.lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
self.exact_hashes: set[str] = set()
self.doc_count = 0
def _get_minhash(self, text: str) -> MinHash:
m = MinHash(num_perm=self.num_perm)
# Use word-level 5-grams for near-duplicate detection
words = text.lower().split()
for i in range(len(words) - 4):
ngram = " ".join(words[i:i + 5])
m.update(ngram.encode("utf-8"))
return m
def is_duplicate(self, text: str) -> bool:
# Check exact duplicate first (fast)
exact_hash = hashlib.sha256(text.encode()).hexdigest()
if exact_hash in self.exact_hashes:
return True
self.exact_hashes.add(exact_hash)
# Check near-duplicate with MinHash LSH
minhash = self._get_minhash(text)
if self.lsh.query(minhash):
return True
# Not a duplicate — add to index
self.doc_count += 1
self.lsh.insert(f"doc_{self.doc_count}", minhash)
return False
# Usage in the pipeline
cleaner = ContentCleaner()
dedup = Deduplicator(threshold=0.85)
def process_document(raw_text: str) -> str | None:
"""Clean and deduplicate a single document. Returns None if filtered."""
cleaned = cleaner.clean(raw_text)
if len(cleaned) < 500:
return None # Too short after cleaning
if dedup.is_duplicate(cleaned):
return None # Duplicate content
return cleanedTip: Install datasketch with: pip install datasketch. For datasets over 10M documents, use the datasketch Redis backend for MinHash LSH to avoid memory issues.
Validate Data Quality
import re
import math
from collections import Counter
class QualityScorer:
"""Score document quality on a 0-1 scale using multiple signals."""
def score(self, text: str, url: str = "") -> dict:
signals = {
"length_score": self._length_score(text),
"language_coherence": self._language_coherence(text),
"information_density": self._information_density(text),
"formatting_quality": self._formatting_quality(text),
"domain_authority": self._domain_authority(url),
}
# Weighted average
weights = {
"length_score": 0.15,
"language_coherence": 0.25,
"information_density": 0.25,
"formatting_quality": 0.15,
"domain_authority": 0.20,
}
overall = sum(signals[k] * weights[k] for k in signals)
return {"overall": round(overall, 3), **signals}
def _length_score(self, text: str) -> float:
length = len(text)
if length < 200:
return 0.1
if length < 500:
return 0.3
if length < 2000:
return 0.7
if length < 20000:
return 1.0
return 0.8 # Very long documents sometimes have quality issues
def _language_coherence(self, text: str) -> float:
"""Estimate language quality using perplexity-like heuristics."""
words = text.lower().split()
if len(words) < 50:
return 0.3
# Vocabulary diversity (type-token ratio, normalized)
unique_words = len(set(words))
ttr = unique_words / math.sqrt(len(words)) # Normalized TTR
ttr_score = min(ttr / 10.0, 1.0)
# Average word length (extreme values indicate non-natural text)
avg_word_len = sum(len(w) for w in words) / len(words)
word_len_score = 1.0 if 3.5 < avg_word_len < 7.0 else 0.5
# Sentence structure (average sentence length)
sentences = re.split(r'[.!?]+', text)
avg_sent_len = len(words) / max(len(sentences), 1)
sent_score = 1.0 if 10 < avg_sent_len < 35 else 0.5
return (ttr_score + word_len_score + sent_score) / 3
def _information_density(self, text: str) -> float:
"""Estimate how much unique information the text contains."""
words = text.lower().split()
if len(words) < 50:
return 0.2
# Repetition ratio (lower is better)
word_counts = Counter(words)
highly_repeated = sum(1 for w, c in word_counts.items() if c > len(words) * 0.02)
repetition_penalty = min(highly_repeated / 20.0, 0.5)
# Content word ratio (exclude common stop words)
stop_words = {"the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
"to", "for", "of", "and", "or", "but", "not", "with", "this", "that"}
content_words = [w for w in words if w not in stop_words and len(w) > 2]
content_ratio = len(content_words) / len(words)
return max(0, content_ratio - repetition_penalty)
def _formatting_quality(self, text: str) -> float:
"""Check for well-structured content."""
has_paragraphs = text.count("\n\n") > 2
has_reasonable_lines = all(len(line) < 500 for line in text.split("\n") if line.strip())
no_excessive_caps = sum(1 for c in text if c.isupper()) / max(len(text), 1) < 0.3
return (has_paragraphs + has_reasonable_lines + no_excessive_caps) / 3
def _domain_authority(self, url: str) -> float:
"""Score based on domain reputation (simplified)."""
high_authority = [".edu", ".gov", ".org", "wikipedia.org", "arxiv.org",
"github.com", "stackoverflow.com", "mozilla.org"]
low_authority = [".xyz", ".info", ".click", "blogspot.com"]
for domain in high_authority:
if domain in url:
return 0.9
for domain in low_authority:
if domain in url:
return 0.3
return 0.6 # Default for unknown domainsTip: For production pipelines, consider using a small classifier model (like a fine-tuned DistilBERT) for quality scoring instead of heuristics. Train it on a few thousand manually labeled examples from your crawl. The heuristic approach here is a good starting point before you have labeled data.
Build the Storage Pipeline
import json
import gzip
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
@dataclass
class TrainingDocument:
url: str
title: str
content: str
language: str
content_type: str
quality_score: float
content_hash: str
word_count: int
scraped_at: str
source_domain: str
class DataStore:
"""Store training data in compressed JSONL format with partitioning."""
def __init__(self, base_dir: str, partition_size: int = 10_000):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
self.partition_size = partition_size
self.buffer: list[TrainingDocument] = []
self.total_written = 0
self.partition_idx = 0
def add(self, doc: TrainingDocument):
self.buffer.append(doc)
if len(self.buffer) >= self.partition_size:
self._flush()
def _flush(self):
if not self.buffer:
return
# Partition by date and index
date_str = datetime.now().strftime("%Y-%m-%d")
partition_dir = self.base_dir / f"date={date_str}"
partition_dir.mkdir(exist_ok=True)
filepath = partition_dir / f"part-{self.partition_idx:05d}.jsonl.gz"
with gzip.open(filepath, "wt", encoding="utf-8") as f:
for doc in self.buffer:
f.write(json.dumps(asdict(doc), ensure_ascii=False) + "\n")
self.total_written += len(self.buffer)
self.partition_idx += 1
print(f"Wrote {len(self.buffer)} docs to {filepath} (total: {self.total_written})")
self.buffer = []
def finalize(self):
"""Flush remaining buffer and write manifest."""
self._flush()
manifest = {
"total_documents": self.total_written,
"partitions": self.partition_idx,
"created_at": datetime.now().isoformat(),
"base_dir": str(self.base_dir),
}
manifest_path = self.base_dir / "manifest.json"
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
print(f"Finalized: {self.total_written} documents in {self.partition_idx} partitions")
# Full pipeline integration
def run_pipeline(
urls: list[str],
requirements: DataRequirements,
output_dir: str,
):
crawler = WebCrawler(max_concurrent=5, output_dir="./raw_crawl")
cleaner = ContentCleaner()
dedup = Deduplicator(threshold=0.85)
scorer = QualityScorer()
store = DataStore(base_dir=output_dir)
import asyncio
async def process_urls():
for url in urls:
result = await crawler.fetch(url)
if not result or result.content_length < requirements.min_content_length:
continue
cleaned = cleaner.clean(result.content)
if not cleaned or len(cleaned) < requirements.min_content_length:
continue
if dedup.is_duplicate(cleaned):
continue
quality = scorer.score(cleaned, url)
if quality["overall"] < requirements.min_quality_score:
continue
doc = TrainingDocument(
url=result.url,
title=result.title,
content=cleaned,
language=result.language,
content_type="article",
quality_score=quality["overall"],
content_hash=result.content_hash,
word_count=len(cleaned.split()),
scraped_at=result.scraped_at,
source_domain=urlparse(url).netloc,
)
store.add(doc)
asyncio.run(process_urls())
store.finalize()Tip: JSONL with gzip compression gives you 5-10x size reduction and is readable by every tool. For datasets over 1TB, switch to Parquet with pyarrow for columnar storage and faster loading in training frameworks.
Handle Anti-Bot at Scale
Tip: Track your per-domain success rate. Any domain with a success rate below 80% is likely running an anti-bot system your current setup cannot handle. Prioritize these domains for real-device infrastructure.
Address Ethical and Legal Considerations
import re
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
class EthicalCrawler:
"""Wrapper that enforces ethical crawling practices."""
USER_AGENT = "TrainingDataBot/1.0 (+https://yourcompany.com/bot)"
def __init__(self):
self.robots_cache: dict[str, RobotFileParser] = {}
def can_fetch(self, url: str) -> bool:
"""Check robots.txt before fetching any URL."""
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain not in self.robots_cache:
rp = RobotFileParser()
rp.set_url(f"{domain}/robots.txt")
try:
rp.read()
except Exception:
return True # If robots.txt is unavailable, allow
self.robots_cache[domain] = rp
return self.robots_cache[domain].can_fetch(self.USER_AGENT, url)
@staticmethod
def strip_pii(text: str) -> str:
"""Remove common PII patterns from text."""
# Email addresses
text = re.sub(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "[EMAIL]", text)
# Phone numbers (US format)
text = re.sub(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE]", text)
# Social Security Numbers
text = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]", text)
return text
@staticmethod
def get_crawl_delay(robots_parser: RobotFileParser) -> float:
"""Respect the crawl-delay directive if set."""
delay = robots_parser.crawl_delay(EthicalCrawler.USER_AGENT)
return float(delay) if delay else 1.0Tip: Publish a webpage explaining your bot at the URL referenced in your user-agent string. Include contact information and an opt-out mechanism. This builds trust with site operators and reduces the chance of being proactively blocked.
FAQ
It depends on the task and base model. For domain adaptation of an existing LLM, 1-10GB of high-quality, domain-specific text is often sufficient. For training a specialized model from scratch, 50-500GB is typical. Quality matters more than quantity — 5GB of clean, relevant text often outperforms 50GB of noisy, general-purpose content.
The legal landscape is evolving rapidly. In the US, several lawsuits are testing whether scraping copyrighted content for training constitutes fair use. The EU AI Act requires training data documentation for certain AI categories. Japan has relatively permissive rules for AI training. The safest approach: prefer openly licensed content, respect robots.txt, document your sources, and consult legal counsel for your jurisdiction.
Cloudflare and similar services use JavaScript challenges, TLS fingerprinting, and behavioral analysis to block bots. A standard Python HTTP client fails immediately. You need either a headless browser with stealth configuration (resource-intensive) or real-device infrastructure that produces authentic fingerprints (reliable but costs more). For a training data pipeline targeting thousands of domains, real-device infrastructure is more practical than configuring per-site browser evasion.
Common Crawl is an excellent starting point — it provides petabytes of pre-crawled web data for free. Use it when you need broad web coverage and can accept data that is weeks to months old. Build your own crawler when you need fresh data, specific domains not well-covered by Common Crawl, or data from sites that block the Common Crawl bot. Many pipelines combine both: Common Crawl for breadth, custom crawling for targeted depth.
Use content hashing (SHA-256) for exact deduplication across runs — store hashes in a persistent database. For near-duplicate detection across runs, maintain a persistent MinHash LSH index (backed by Redis or similar). When processing a new crawl batch, check each document against the existing index before adding it to your dataset. This prevents gradual quality degradation from content that changes slightly between crawls.
For the compute layer, spot/preemptible cloud instances provide the best cost per crawl. For the network layer, the tradeoff is between DIY proxy management (cheapest but highest maintenance) and managed infrastructure (more expensive but reliable). At scale, the total cost is dominated by proxy/infrastructure costs, not compute. Real-device infrastructure like Archonum provides the highest success rates, which reduces wasted compute on failed requests and retries.
Access the Full Web for AI Training Data
Archonum's real-device infrastructure handles the anti-bot challenge at scale. Route your training data pipeline through real smartphones for 99.9% success rates across every domain and protection system.
Talk to Sales