Leitfaden für verteiltes Web Crawling

Entdecken Sie verteilte Web-Crawling-Strategien, Architekturen und praktische Beispiele für die Ausführung skalierbarer Scraper auf mehreren Rechnern.
27 min lesen
Guide to Distributed Web Crawling blog image

Verteiltes Web-Crawling ist eine Strategie zur Skalierung von Web-Scrapern über mehrere Rechner hinweg und überwindet damit die Einschränkungen von Single-Node-Crawlern. In diesem Artikel werden wir erkunden:

  • Verteiltes Web-Crawling vs. Web-Crawling mit einem einzigen Knoten
  • Die Kernarchitektur des verteilten Webcrawling
  • Beispiele aus der Praxis für verteiltes Web-Crawling
  • Umsetzungsstrategien und bewährte Verfahren
  • Häufige Fallstricke und wie man sie behebt

TL;DR: Verteiltes Web-Crawling nutzt einen Cluster von Rechnern, um Websites parallel zu crawlen und löst damit Skalierbarkeits- und Geschwindigkeitsprobleme, die Crawler mit nur einem Knoten nicht bewältigen können. Es bietet einen höheren Durchsatz und eine höhere Zuverlässigkeit (kein einzelner Engpass) auf Kosten der zusätzlichen architektonischen Komplexität und des Overheads.

Verteiltes Crawling im Vergleich zu Single-Node Crawling

Die meisten Crawling-Projekte benötigen keine verteilten Systeme, und dennoch verschwenden Teams regelmäßig Monate mit dem Aufbau komplexer verteilter Architekturen, obwohl ein einzelner Server ausreichen würde.

Bei einem Single-Node-Crawler übernimmt ein einziger Rechner das Abrufen, Parsen und Speichern. Diese Art von System ist einfacher zu entwickeln und zu warten und spart Ihnen Geld. Für das Abrufen von 60-500 Seiten pro Minute ist es großartig, aber wenn Ihr Crawling-Bedarf wächst, wird ein einzelner Knoten zu einem Engpass, da Sie durch CPU-, Speicher- und Netzwerkbeschränkungen eingeschränkt werden.

Im Gegensatz dazu verteilen verteilte Crawler die Arbeit auf mehrere Knoten und ermöglichen so gleichzeitige Abrufe in großem Umfang, hohe Geschwindigkeit und verbesserte Fehlertoleranz. Wenn ein Worker ausfällt, laufen die anderen weiter, was die Zuverlässigkeit erhöht. Der Nachteil ist, dass verteilte Systeme Nachrichtenwarteschlangen, die Synchronisierung einer URL-Grenze und ein sorgfältiges Design erfordern, um Doppelarbeit oder eine Überlastung der Zielseiten zu vermeiden.

Umfassender Vergleich

Aspekt Einzelknoten Verteilt
Leistung Durchschnittlich 4 Sekunden/Seite, 60-120 Seiten/Minute 30x schneller, 50.000+ Anfragen/Sekunde
Skalierbarkeit Begrenzt durch die Ressourcen einer einzelnen Maschine Lineare Skalierung über Knoten hinweg
Fehlertoleranz Einzelne Fehlerstelle Automatische Ausfallsicherung, Selbstheilung
Geografische Verteilung Fester Standort Einsatz über mehrere Regionen hinweg
Nutzung der Ressourcen Nur vertikale Skalierung Horizontale Skalierung optimiert
Komplexität Einfache Einrichtung, minimaler Overhead Komplexe Orchestrierung, höhere Betriebskosten
Kosten Geringere Erstinvestition Höhere Infrastrukturkosten, besserer ROI im großen Maßstab
Wartung Minimale operative Belastung Erfordert Fachwissen über verteilte Systeme
Datenverarbeitung Nur lokale Verarbeitung Parallele Verarbeitung über Knoten hinweg
Anti-Detektion Begrenzte IP-Rotation Erweiterte Proxy-Verwaltung, Fingerprinting

Sollten Sie sich für eine verteilte Lösung entscheiden? (Ein Entscheidungsbaum)

Ein Entscheidungsbaum, der zeigt, ob der Vertrieb der richtige Ansatz ist

Kernbausteine & Architektur

Wenn Sie sich für verteiltes Crawling entschieden haben, ist der nächste Schritt die Aufschlüsselung dessen, was Sie eigentlich bauen wollen. Stellen Sie sich das so vor, als würden Sie ein leistungsstarkes Rennteam zusammenstellen, bei dem jede Komponente eine bestimmte Aufgabe hat und alle nahtlos zusammenarbeiten müssen. Hier sind die wichtigsten Komponenten, die Sie zum Aufbau eines verteilten Crawling-Systems benötigen:

Planer / Warteschlange (Das Gehirn)

Das Herzstück eines verteilten Crawlers ist ein Scheduler oder eine Aufgaben-Warteschlange, die die Arbeit zwischen den Knoten koordiniert und in der sich Ihre URLs befinden, bevor sie gecrawlt werden. Eine Scheduler-Komponente kann auch für Höflichkeit (Timing) und Wiederholungen sorgen. Sie können zum Beispiel domänenspezifische Warteschlangen implementieren, um sicherzustellen, dass eine Website nicht von allen Arbeitern auf einmal bearbeitet wird.

Bei den Terminplanern haben Sie drei Hauptoptionen, jede mit ihrer eigenen Persönlichkeit:

  • Kafka: Das ist so etwas wie der Schwergewichts-Champion. Es ist für massiven Durchsatz ausgelegt und bewältigt mühelos Millionen von Nachrichten pro Sekunde. Die Schönheit liegt in seinem logbasierten Design, das sich perfekt für die Verwaltung Ihrer URL-Grenze eignet. Sie können nach Domänen partitionieren, um Ihr Crawling übersichtlich zu halten.
  • RabbitMQ: Dies ist wie ein Schweizer Taschenmesser. Flexibleres Routing als Kafka, mit Funktionen wie Prioritätswarteschlangen. RabbitMQ verfügt über In-Memory-Speicher, ist also schneller für kleinere Arbeitslasten. Ideal, wenn Sie verschiedene Crawling-Strategien für verschiedene Arten von Inhalten benötigen.
  • Celery: Der beste Freund des Python-Entwicklers. Diese Option ist nicht so effizient wie die anderen, aber sie ist einfach zu verwenden. Celery eignet sich perfekt für das Prototyping oder mittelgroße Crawlings, wenn Sie schnell etwas zum Laufen bringen müssen.

URL-Grenze und Deduplizierung: Das Gedächtnis des Crawlers

Haben Sie schon einmal versehentlich die gleiche Seite 1.000 Mal gecrawlt? Das ist der Punkt, an dem die Deduplizierung Sie rettet. Sie müssen nachverfolgen, was Sie gesehen haben, und dabei die Höflichkeit des Servers respektieren, damit Sie nicht immer wieder auf dieselbe Domain stoßen.

Redis-Sets können perfekte Genauigkeit bieten, verbrauchen aber viel Speicherplatz. Bloom-Filter verbrauchen 90 % weniger Speicher (1,2 GB gegenüber 12 GB+ für eine Milliarde URLs), haben aber gelegentlich falsch-positive Ergebnisse (sie könnten sagen, dass Sie eine URL nicht gesehen haben, obwohl Sie sie gesehen haben), daher sollten Sie diese Redis-Implementierung wählen:

class DistributedURLFrontier:
    def __init__(self, redis_client):
        self.redis = redis_client

    def add_url(self, url, priority=0):
        domain = urlparse(url).netloc

        # Skip if already seen
        if self.redis.sismember("seen_urls", url):
            return

        # Mark as seen and queue by domain
        self.redis.sadd("seen_urls", url)
        self.redis.lpush(f"queue:{domain}", url)
        self.redis.zadd("priority_queue", {url: priority})

    def get_next_url(self):
        # Get highest priority URL
        result = self.redis.zrevrange("priority_queue", 0, 0)
        if not result:
            return None

        url = result[0]
        domain = urlparse(url).netloc

        # Respect crawl delay (1 second between requests per domain)
        last_crawl = self.redis.get(f"last_crawl:{domain}")
        if last_crawl and time.time() - float(last_crawl) < 1.0:
            return None

        # Remove from queues and update last crawl time
        self.redis.zrem("priority_queue", url)
        self.redis.rpop(f"queue:{domain}")
        self.redis.set(f"last_crawl:{domain}", time.time())

        return url

Worker Nodes (Der Muskel)

Arbeiterknoten sind die Arbeitspferde Ihres Crawlings. Sie sind die Prozesse oder Maschinen, die die eigentliche Crawling-Arbeit ausführen, wie das Abrufen von URLs und die Verarbeitung der Inhalte. Jeder Worker führt die gleiche Crawling-Logik aus (z. B. dasselbe Python-Skript oder dieselbe Anwendung), aber sie arbeiten parallel an verschiedenen URLs aus der Warteschlange.

Um das Beste aus Ihren Workern herauszuholen, müssen Sie sie zustandslos halten, d. h. jeder Status (besuchte URLs, Ergebnisse usw.) wird in einem gemeinsamen Speicher abgelegt oder über Nachrichten weitergegeben. Auf diese Weise kann jeder Worker jede Aufgabe übernehmen, und wenn ein Worker ausfällt, springen andere sofort ein, ohne dass es zu einem Ausfall kommt.

class DistributedWorker:
    def __init__(self, worker_id, max_concurrent=50):
        self.worker_id = worker_id
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(limit=100)
        )

    async def crawl_batch(self, urls):
        tasks = [self.crawl_url(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def crawl_url(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    content = await response.text()
                    return {'url': url, 'content': content, 'status': response.status}
            except Exception as e:
                return {'url': url, 'error': str(e)}

Pro-Tipp: Bei Workern ist es wichtig, nicht für alles einen Vorschlaghammer zu verwenden. Sie sollten leichtgewichtige HTTP-Worker für statisches HTML und schwere Puppeteer-Worker für JavaScript-gerenderte Seiten verwenden. Verschiedene Tools, verschiedene Worker-Pools. Mit unserem umfassenden Leitfaden zur Proxy-Auswahl können Sie ganz einfach die richtigen Proxy-Typen für Ihre Worker-Flotte auswählen.

Speicherschicht (Das Lager)

Die Speicherebene ist der Ort, an dem Sie die gecrawlten Daten und Metadaten speichern, und sie besteht häufig aus zwei Teilen:

  • Content Storage verwaltet die große Menge an Roh-HTML, JSON-Antworten, Bildern und PDFs. Betrachten Sie ihn als Ihr digitales Lager. Objektspeicher wie S3, Google Cloud Storage oder HDFS zeichnen sich hier aus, da sie unendlich skalierbar sind und gleichzeitige Schreibvorgänge von mehreren Arbeitern mühelos verarbeiten können.
  • DerMetadatenspeicher enthält das strukturierte Gold, das Sie extrahiert haben – geparste Felder, Entitätsbeziehungen, Crawl-Zeitstempel und Erfolgs-/Misserfolgsstatus. Diese Daten werden in Datenbanken gespeichert, die für Abfragen und Aktualisierungen optimiert sind, nicht nur für das Speichervolumen.

Verteilte Crawler benötigen einen Speicher, der massive gleichzeitige Schreibvorgänge bewältigen kann, ohne zu erdrücken. Objektspeicher wie S3 oder Google Cloud Storage eignen sich hervorragend für Rohinhalte, da sie unendlich skalierbar sind, während NoSQL-Datenbanken (MongoDB, Cassandra) oder SQL strukturierte Metadaten effektiv verarbeiten.

Überwachung und Alarmierung

Der Betrieb eines verteilten Crawlers erfordert einen Einblick in die Leistung des Systems. Sie können Prometheus und Grafana verwenden, um umfassende Überwachungs-Dashboards zu erstellen, die Crawl-Raten, Erfolgsraten, Antwortzeiten und Warteschlangentiefen verfolgen. Zu den wichtigsten Metriken gehören Anfragen pro Sekunde nach Domäne, 95-prozentige Antwortzeiten und Trends bei der Warteschlangengröße.

Anti-Bot & Ausweichschicht

Web-Crawling in großem Maßstab bedeutet ein ständiges Katz-und-Maus-Spiel mit Anti-Bot-Systemen. Sie benötigen drei Verteidigungsschichten: IP-Rotation über Tausende von Proxies in Privathaushalten und Rechenzentren, Randomisierung der Fingerabdrücke von Benutzer-Agenten und Browsersignaturen sowie Nachahmung von Verhaltensweisen, um Erkennungsmuster zu vermeiden.

Bright Data Web Unlocker bietet Erkennungsfunktionen auf Unternehmensniveau mit einer Erfolgsquote von über 99 %, die durch automatische CAPTCHA-Auflösung, IP-Rotation und Browser-Fingerprinting erreicht wird. Sein API-basierter Ansatz vereinfacht die Integration und bewältigt gleichzeitig komplexe Anti-Bot-Herausforderungen.

class BrightDataWebUnlocker:
    def crawl_url(self, url: str, options: Dict = None) -> Dict:
        payload = {
            "url": url,
            "zone": self.zone,
            "format": "raw",
            "country": "US",
            "render_js": True,
            "wait_for_selector": ".content"
        }

        response = requests.post(
            self.base_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            json=payload,
            timeout=60
        )

Die fortschrittliche Proxy-Rotation implementiert Zustandsprüfungen, geografische Optimierung und Fehlerbehebung in Proxy-Pools für Privatanwender, Rechenzentren und mobile Geräte. Ein erfolgreiches Proxy-Management erfordert mehr als 1000 IPs mit intelligenten Rotationsalgorithmen.

Bei der Vermeidung von Fingerabdrücken werden Benutzer-Agenten, Browser-Fingerabdrücke und Netzwerkcharakteristika nach dem Zufallsprinzip verändert, um eine Erkennung durch ausgeklügelte Anti-Bot-Systeme zu verhindern. Dazu gehören TLS-Fingerprint-Rotation, Spoofing von Canvas-Fingerprints und die Simulation von Verhaltensmustern.

Praktische Anwendungsfälle mit Codebeispielen

Im Folgenden werden zwei häufige Anwendungsfälle für verteilte Crawler untersucht und anhand von Codeschnipseln erläutert, wie diese implementiert werden können. Der Einfachheit halber werden wir in den Beispielen Python und Celery verwenden, aber die Prinzipien gelten allgemein.

Anwendungsfall 1: Preisüberwachung im E-Commerce

Stellen Sie sich vor, Sie verfolgen jeden Tag die Preise der Wettbewerber auf 50.000 Produktseiten. Wenn Sie versuchen, all diese URLs mit einem einzigen Rechner abzufragen, müssen Sie mit mehr als 12 Stunden Crawling rechnen, vorausgesetzt, dass nichts kaputt geht. Hinzu kommt, dass die meisten E-Commerce-Websites Sie nach ein paar Tausend Schnellschuss-Anfragen von derselben IP-Adresse aus sperren.

Hier ist das verteilte Crawling hilfreich. Anstelle eines einzigen überlasteten Rechners verteilen Sie diese 50 000 URLs auf Dutzende von Mitarbeitern, die jeweils unterschiedliche IP-Adressen verwenden. Was früher einen halben Tag dauerte, ist jetzt in 2 bis 3 Stunden erledigt, und Sie bleiben unter dem Radar von Anti-Bot-Systemen.

Die Einrichtung ist ganz einfach. Sie müssen die URL-Listen Ihrer Konkurrenten pflegen (aus Sitemaps oder Discovery Crawls) und dann etwas wie Celery mit Redis verwenden, um die Arbeit zu verteilen. Jeden Morgen stellen Sie alle 50.000 URLs in eine Warteschlange, und Ihre Arbeiter machen sich an die Arbeit. Worker 1 kümmert sich um Nike-Laufschuhe, Worker 2 um Adidas-Sneakers, Worker 3 um Puma-Preise. Alle gleichzeitig, alle von verschiedenen IPs.

from celery import Celery
import requests
from bs4 import BeautifulSoup
import random
import time
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Initialize Celery app with Redis as broker
app = Celery('price_monitor', broker='redis://localhost:6379/0')

# Realistic user agents for rotation
USER_AGENTS = [
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

# Proxy pool (replace with your actual proxy service)
PROXY_POOL = [
   "<http://proxy1:8080>",
   "<http://proxy2:8080>",
   "<http://proxy3:8080>",
   # Add your proxy endpoints here
]

def get_session_with_retries():
   """Create a session with retry strategy and random proxy."""
   session = requests.Session()

   # Retry strategy for resilience
   retry_strategy = Retry(
       total=3,
       backoff_factor=1,
       status_forcelist=[429, 500, 502, 503, 504],
   )
   adapter = HTTPAdapter(max_retries=retry_strategy)
   session.mount("http://", adapter)
   session.mount("https://", adapter)

   # Random proxy rotation
   if PROXY_POOL:
       proxy = random.choice(PROXY_POOL)
       session.proxies = {"http": proxy, "https": proxy}

   return session

@app.task(bind=True, max_retries=3)
def fetch_product_price(self, url, site_config=None):
   """Fetches product price with full anti-detection measures."""

   # Human-like delay before starting
   time.sleep(random.uniform(2, 8))

   # Randomized headers to avoid fingerprinting
   headers = {
       "User-Agent": random.choice(USER_AGENTS),
       "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
       "Accept-Language": "en-US,en;q=0.9",
       "Accept-Encoding": "gzip, deflate, br",
       "Connection": "keep-alive",
       "Upgrade-Insecure-Requests": "1",
       "Sec-Fetch-Dest": "document",
       "Sec-Fetch-Mode": "navigate",
       "Sec-Fetch-Site": "none",
       "Cache-Control": "max-age=0"
   }

   try:
       session = get_session_with_retries()
       resp = session.get(url, headers=headers, timeout=30)
       resp.raise_for_status()

       # Parse the page for price
       soup = BeautifulSoup(resp.text, 'html.parser')
       price_value = extract_price(soup, url, site_config)

       if price_value:
           # Store in database (implement your storage logic here)
           store_price_data(url, price_value, resp.status_code)
           return {"url": url, "price": price_value, "status": "success"}
       else:
           return {"url": url, "error": "Price not found", "status": "failed"}

   except requests.exceptions.RequestException as e:
       print(f"Request failed for {url}: {e}")

       # Retry with exponential backoff
       if self.request.retries < self.max_retries:
           raise self.retry(countdown=60 * (2 ** self.request.retries))

       return {"url": url, "error": str(e), "status": "failed"}

def extract_price(soup, url, site_config=None):
   """Extract price using multiple strategies."""

   # Site-specific selectors (customize for each competitor)
   price_selectors = [
       ".price", ".product-price", ".current-price", ".sale-price",
       "[data-price]", ".price-current", ".price-now", ".offer-price"
   ]

   # Try configured selectors first
   if site_config and site_config.get('price_selector'):
       price_selectors.insert(0, site_config['price_selector'])

   price_text = None
   for selector in price_selectors:
       price_elem = soup.select_one(selector)
       if price_elem:
           price_text = price_elem.get_text(strip=True)
           break

   # Try data attributes as fallback
   if not price_text:
       price_elem = soup.find(attrs={"data-price": True})
       if price_elem:
           price_text = price_elem.get("data-price")

   if not price_text:
       return None

   # Clean and parse price
   return parse_price(price_text)

def parse_price(price_text):
   """Parse price from various formats."""
   # Remove common currency symbols and whitespace
   cleaned = re.sub(r'[^\\d.,]', '', price_text)

   # Handle formats like "1,299.99" or "1299.99"
   try:
       # Remove commas and convert to float
       if ',' in cleaned and '.' in cleaned:
           # Format: 1,299.99
           price_value = float(cleaned.replace(',', ''))
       elif ',' in cleaned:
           # Could be European format: 1299,99
           if cleaned.count(',') == 1 and len(cleaned.split(',')[1]) == 2:
               price_value = float(cleaned.replace(',', '.'))
           else:
               # Format: 1,299 (no cents)
               price_value = float(cleaned.replace(',', ''))
       else:
           price_value = float(cleaned)

       return price_value

   except ValueError:
       print(f"Could not parse price from: {price_text}")
       return None

def store_price_data(url, price, status_code):
   """Store price data in your database."""
   # Implement your storage logic here
   # Could be PostgreSQL, MongoDB, or any other database
   print(f"Storing: {url} -> ${price} (Status: {status_code})")

# Site-specific configurations for better accuracy
SITE_CONFIGS = {
   "competitor1.com": {"price_selector": ".price-box .price"},
   "competitor2.com": {"price_selector": "[data-testid='price']"},
   "competitor3.com": {"price_selector": ".product-price-value"},
}

def get_site_config(url):
   """Get site-specific configuration."""
   for domain, config in SITE_CONFIGS.items():
       if domain in url:
           return config
   return None

# Load your 50k product URLs (from database, file, or API)
def load_product_urls():
   """Load URLs from your data source."""
   # Replace with your actual data loading logic
   urls = [
       "<https://competitor1.com/product/123>",
       "<https://competitor2.com/product/456>",
       # ... 49,998 more URLs
   ]
   return urls

# Main execution: dispatch all crawling tasks
def start_daily_price_monitoring():
   """Start the daily price monitoring job."""
   product_urls = load_product_urls()

   print(f"Starting crawl for {len(product_urls)} URLs...")

   for url in product_urls:
       site_config = get_site_config(url)
       fetch_product_price.delay(url, site_config)

   print("All tasks queued successfully!")

# Run with: python -m celery worker -A price_monitor --loglevel=info
# Start monitoring with: start_daily_price_monitoring()

Im erweiterten Code oben ist fetch_product_price eine robuste Celery-Aufgabe, die für die Preisüberwachung im Unternehmensmaßstab entwickelt wurde. Durch den Aufruf von delay(url, site_config) für jede URL stellen wir die Aufgaben in eine Warteschlange in Redis, wo 100+ Arbeiter sie sofort abrufen können. Der verteilte Ansatz verwandelt einen 12-stündigen Crawl auf einer einzelnen Maschine in einen 2-3-stündigen Vorgang für Ihre Mitarbeiterflotte.

Wichtige Überlegungen zur Produktion:

  • Die Proxy-Verwaltung ist von entscheidender Bedeutung: Dieses Beispiel enthält einen PROXY_POOL, der die IPs pro Anfrage rotiert, was bei 50.000 URLs unerlässlich ist. Ohne diese Funktion werden die Zielseiten im Wesentlichen von einer IP aus angegriffen, was eine Blockade garantiert.
  • Ratenbegrenzung pro Domain: Selbst bei einer Verteilung lösen 50.000 URLs von einer Wettbewerber-Website einen Alarm aus, wenn sie alle innerhalb von Minuten eintreffen. Wir berücksichtigen menschenähnliche Verzögerungen(time.sleep(random.uniform(2, 8)), aber auch eine domänenspezifische Drosselung.
  • Planung und Überwachung. Verwenden Sie Celery Beat für die tägliche Planung oder integrieren Sie es in Airflow für komplexe Arbeitsabläufe. Die Funktion start_daily_price_monitoring() kann über Cron oder Ihre Orchestrierungsplattform ausgelöst werden.
  • Integration der Datenpipeline. Nach jedem Crawl speichert die Funktion store_price_data() die Ergebnisse in Ihrer Datenbank.
  • Fehlertoleranz. Der Code enthält eine Wiederholungslogik mit exponentiellem Backoff, aber planen Sie auch Teilausfälle ein. Wenn 5 % der URLs durchweg fehlschlagen, sollten Sie untersuchen, ob diese Produkte eingestellt oder verschoben wurden oder ob diese spezifischen Websites stärkere Anti-Bot-Maßnahmen haben, die andere Ansätze erfordern.

Anwendungsfall 2: SEO und Marktforschung

SEO und Marktforschung erfordern das Crawlen von Millionen von Seiten in zwei entscheidenden Bereichen: Inhaltsanalyse und Suchmaschinenüberwachung. Dabei geht es nicht nur um Scraping, sondern um den Aufbau von Wettbewerbsinformationen, die Schnelligkeit, Unauffälligkeit und Präzision erfordern.

Wenn Sie die Erwähnung von Schlüsselwörtern auf 1 Million Konkurrenzseiten verfolgen und gleichzeitig die SERP-Rankings für Hunderte von Zielschlüsselwörtern täglich überwachen wollen, würde ein einzelner Rechner Wochen brauchen und innerhalb von Stunden blockiert werden. Dies schreit geradezu nach einer verteilten Architektur.

Der Ansatz des verteilten Web-Crawling kann in zwei Ströme aufgeteilt werden:

  • Intelligente Inhalte: Durchsuchen Sie Websites von Mitbewerbern, Nachrichten und Branchenblogs, um Schlüsselwortdichte, Inhaltslücken und Markttrends zu ermitteln.
  • SERP-Überwachung: Überwachen Sie die Google/Bing-Rankings für Ihre Ziel-Keywords, verfolgen Sie die Positionen Ihrer Mitbewerber und Änderungen der SERP-Funktionen
from celery import Celery
import requests
from bs4 import BeautifulSoup
import redis
import hashlib
import json
import time
import random
import re
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging

# Initialize Celery and Redis
app = Celery('seo_intelligence', broker='redis://localhost:6379/0')
redis_client = redis.Redis(host='localhost', port=6379, db=1)

# Anti-detection configurations
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15"
]

PROXY_POOL = [
    "<http://user:[email protected]:8080>",
    "<http://user:[email protected]:8080>",
    # Add your proxy endpoints
]

@dataclass
class KeywordData:
    keyword: str
    frequency: int
    context: List[str]  # Surrounding text snippets
    url: str
    domain: str

@dataclass
class SERPResult:
    keyword: str
    position: int
    title: str
    url: str
    snippet: str
    domain: str

class SEOCrawler:
    def __init__(self):
        self.session = self._create_session()
        
    def _create_session(self):
        session = requests.Session()
        if PROXY_POOL:
            proxy = random.choice(PROXY_POOL)
            session.proxies = {"http": proxy, "https": proxy}
        return session
    
    def _get_headers(self):
        return {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Cache-Control": "max-age=0"
        }

# Deduplication utilities
def get_url_hash(url: str) -> str:
    """Generate consistent hash for URL deduplication."""
    return hashlib.md5(url.encode()).hexdigest()

def is_url_processed(url: str) -> bool:
    """Check if URL was already processed today."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    return redis_client.exists(f"processed:{today}:{url_hash}")

def mark_url_processed(url: str):
    """Mark URL as processed with 24h expiry."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    redis_client.setex(f"processed:{today}:{url_hash}", 86400, 1)

# Stream 1: Content Intelligence Crawling
@app.task(bind=True, max_retries=3)
def crawl_content_for_keywords(self, url: str, target_keywords: List[str]):
    """Crawl a page and extract keyword intelligence."""
    
    # Skip if already processed today
    if is_url_processed(url):
        return {"status": "skipped", "reason": "already_processed", "url": url}
    
    # Human-like delay
    time.sleep(random.uniform(3, 7))
    
    try:
        crawler = SEOCrawler()
        response = crawler.session.get(
            url, 
            headers=crawler._get_headers(), 
            timeout=30
        )
        response.raise_for_status()
        
        # Extract content and analyze keywords
        soup = BeautifulSoup(response.text, 'html.parser')
        content_data = extract_keyword_intelligence(soup, url, target_keywords)
        
        # Store results
        store_keyword_data(content_data)
        mark_url_processed(url)
        
        return {
            "status": "success",
            "url": url,
            "keywords_found": len(content_data),
            "total_mentions": sum(kd.frequency for kd in content_data)
        }
        
    except Exception as e:
        logging.error(f"Content crawl failed for {url}: {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=60 * (2 ** self.request.retries))
        return {"status": "failed", "url": url, "error": str(e)}

def extract_keyword_intelligence(soup: BeautifulSoup, url: str, keywords: List[str]) -> List[KeywordData]:
    """Extract keyword data from page content."""
    # Remove script and style elements
    for script in soup(["script", "style", "nav", "footer", "header"]):
        script.decompose()
    
    # Get clean text content
    text = soup.get_text()
    text = re.sub(r'\\s+', ' ', text).strip().lower()
    
    domain = urlparse(url).netloc
    keyword_data = []
    
    for keyword in keywords:
        keyword_lower = keyword.lower()
        
        # Find all occurrences
        pattern = r'\\b' + re.escape(keyword_lower) + r'\\b'
        matches = list(re.finditer(pattern, text))
        
        if matches:
            # Extract context around each match
            contexts = []
            for match in matches[:5]:  # Limit to first 5 for performance
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                context = text[start:end].strip()
                contexts.append(context)
            
            keyword_data.append(KeywordData(
                keyword=keyword,
                frequency=len(matches),
                context=contexts,
                url=url,
                domain=domain
            ))
    
    return keyword_data

# Stream 2: SERP Tracking
@app.task(bind=True, max_retries=3)
def track_serp_rankings(self, keyword: str, search_engine: str = "google"):
    """Track SERP positions for a keyword."""
    
    time.sleep(random.uniform(5, 10))  # Longer delay for search engines
    
    try:
        crawler = SEOCrawler()
        
        if search_engine == "google":
            search_url = f"<https://www.google.com/search?q={keyword}&num=20>"
        else:  # Bing
            search_url = f"<https://www.bing.com/search?q={keyword}&count=20>"
        
        # Special headers for search engines
        headers = crawler._get_headers()
        headers.update({
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Referer": "<https://www.google.com/>" if search_engine == "google" else "<https://www.bing.com/>"
        })
        
        response = crawler.session.get(search_url, headers=headers, timeout=30)
        response.raise_for_status()
        
        # Parse SERP results
        soup = BeautifulSoup(response.text, 'html.parser')
        serp_data = parse_serp_results(soup, keyword, search_engine)
        
        # Store SERP data
        store_serp_data(serp_data)
        
        return {
            "status": "success",
            "keyword": keyword,
            "results_found": len(serp_data),
            "search_engine": search_engine
        }
        
    except Exception as e:
        logging.error(f"SERP tracking failed for '{keyword}': {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=120 * (2 ** self.request.retries))
        return {"status": "failed", "keyword": keyword, "error": str(e)}

def parse_serp_results(soup: BeautifulSoup, keyword: str, search_engine: str) -> List[SERPResult]:
    """Parse search engine results page."""
    results = []
    position = 1
    
    if search_engine == "google":
        # Google result selectors
        result_elements = soup.select('div.g')
        
        for element in result_elements:
            title_elem = element.select_one('h3')
            link_elem = element.select_one('a[href]')
            snippet_elem = element.select_one('.VwiC3b, .s3v9rd')
            
            if title_elem and link_elem:
                url = link_elem.get('href', '')
                if url.startswith('/url?q='):
                    url = url.split('/url?q=')[1].split('&')[0]
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:  # Limit to top 20
                    break
    
    else:  # Bing
        result_elements = soup.select('.b_algo')
        
        for element in result_elements:
            title_elem = element.select_one('h2 a')
            snippet_elem = element.select_one('.b_caption p')
            
            if title_elem:
                url = title_elem.get('href', '')
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:
                    break
    
    return results

# Data storage functions
def store_keyword_data(keyword_data: List[KeywordData]):
    """Store keyword intelligence in database."""
    for kd in keyword_data:
        data = {
            "keyword": kd.keyword,
            "frequency": kd.frequency,
            "context": kd.context,
            "url": kd.url,
            "domain": kd.domain,
            "crawled_at": time.time()
        }
        # Store in your preferred database (PostgreSQL, MongoDB, etc.)
        redis_client.lpush(f"keyword_data:{kd.keyword}", json.dumps(data))
        print(f"Stored: {kd.keyword} found {kd.frequency} times on {kd.domain}")

def store_serp_data(serp_data: List[SERPResult]):
    """Store SERP tracking data."""
    for result in serp_data:
        data = {
            "keyword": result.keyword,
            "position": result.position,
            "title": result.title,
            "url": result.url,
            "snippet": result.snippet,
            "domain": result.domain,
            "tracked_at": time.time()
        }
        redis_client.lpush(f"serp_data:{result.keyword}", json.dumps(data))
        print(f"SERP: '{result.keyword}' -> #{result.position} {result.domain}")

# Orchestration functions
def start_content_intelligence_crawl(urls: List[str], keywords: List[str]):
    """Launch content crawling across 1M+ URLs."""
    print(f"Starting content intelligence crawl for {len(urls)} URLs...")
    
    for url in urls:
        crawl_content_for_keywords.delay(url, keywords)
    
    print(f"Queued {len(urls)} content crawling tasks")

def start_serp_tracking(keywords: List[str], search_engines: List[str] = ["google", "bing"]):
    """Launch SERP tracking for target keywords."""
    print(f"Starting SERP tracking for {len(keywords)} keywords...")
    
    for keyword in keywords:
        for engine in search_engines:
            track_serp_rankings.delay(keyword, engine)
    
    print(f"Queued {len(keywords) * len(search_engines)} SERP tracking tasks")

# Example usage
if __name__ == "__main__":
    # Target keywords for analysis
    target_keywords = [
        "artificial intelligence", "machine learning", "data science",
        "cloud computing", "cybersecurity", "digital transformation"
    ]
    
    # URLs to crawl for content intelligence (load from your database)
    content_urls = [
        "<https://techcrunch.com/ai>",
        "<https://venturebeat.com/ai>",
        "<https://competitor-blog.com/insights>",
        # ... 999,997 more URLs
    ]
    
    # Keywords to track in SERPs
    serp_keywords = [
        "best AI tools 2025", "enterprise machine learning",
        "data analytics platform", "cloud security solutions"
    ]
    
    # Launch both crawling streams
    start_content_intelligence_crawl(content_urls, target_keywords)
    start_serp_tracking(serp_keywords)

Wichtige Überlegungen zur Produktion:

  • Intelligente Deduplizierung: Das System verwendet Redis mit 24-stündigem Ablaufdatum, um zu vermeiden, dass derselbe Inhalt täglich neu gecrawlt wird. Für eine tiefer gehende Deduplizierung sollten Sie ein Content-Hashing in Betracht ziehen, um Seiten zu erkennen, deren URLs sich geändert haben, die aber denselben Inhalt enthalten.
  • Bereichsbezogene Ratenbegrenzung: Beim Crawling von SERPs ist besondere Vorsicht geboten, da die Suchmaschinen aggressiver bei der Blockierung vorgehen. Unser Beispiel beinhaltet längere Verzögerungen (5-10 Sekunden) für Suchanfragen gegenüber dem Crawling von Inhalten (3-7 Sekunden).
  • Verfolgung von SERP-Funktionen: Der Parser verarbeitet sowohl Google- als auch Bing-Ergebnisse, aber Sie können ihn erweitern, um Featured Snippets, Local Packs und andere SERP-Funktionen zu verfolgen, die Ihre Sichtbarkeitsstrategie beeinflussen.
  • Integration von Datenpipelines: Speichern Sie die Ergebnisse in Ihrer bevorzugten Datenbank (PostgreSQL für relationale Analysen, MongoDB für flexible Schemata).

Bewährte Praktiken

Respektieren Sie robots.txt oder tragen Sie die Konsequenzen

Lesen Sie robots.txt, bevor Sie URLs in die Warteschlange stellen, und halten Sie sich strikt an die Crawl-Verzögerungsrichtlinien. Wenn Sie diese ignorieren, wird Ihr gesamter IP-Bereich schneller auf die schwarze Liste gesetzt, als Sie “verteilter Crawler” sagen können. Integrieren Sie die robots.txt-Prüfung direkt in die URL-Grenze und überlassen Sie diese Aufgabe nicht dem Arbeitsknoten.

Neben der Einhaltung von robots.txt sollten Sie auch umfassende Strategien zur Vermeidung von Erkennungen in Ihrer gesamten verteilten Flotte implementieren.

Immer für 3 AM Debugging protokollieren

Wenn Ihr Crawl um Mitternacht endet, brauchen Sie Metadaten: URL, HTTP-Status, Latenzzeit, Proxy-ID, Worker-ID und Zeitstempel für jede einzelne Anfrage. JSON-strukturierte Protokolle retten Ihren Verstand. Die Frage ist nicht, ob Sie einen Fehler in der Produktion beheben müssen, sondern nur wann.

Alles validieren, nichts trauen

Die Schema-Validierung der extrahierten Daten ist für das Überleben Ihrer verteilten Web-Crawler unerlässlich, denn schon eine einzige fehlerhafte Antwort kann Ihren gesamten Datensatz vergiften. Überprüfen Sie Feldtypen, erforderliche Felder und die Aktualität der Daten bei der Aufnahme. Fangen Sie Datenmüll frühzeitig auf, oder er wird Ihre Analyse noch Monate später beeinträchtigen.

Die Verschuldung der Geschwindigkeit rücksichtslos bekämpfen

Verteilte Systeme verrotten schnell. Sie müssen monatliche Bereinigungen von veralteten Redis-Schlüsseln, fehlgeschlagenen Aufgabenwarteschlangen und verwaisten Arbeitsprozessen planen. Tote URLs häufen sich, Proxy-Pools werden durch blockierte IPs verschmutzt, und Speicherlecks bei den Arbeitsprozessen häufen sich mit der Zeit. Die Wartung ist nicht glamourös, aber sie hält Ihren Crawler gesund. Technische Schulden in Crawlern nehmen exponentiell zu. Kümmern Sie sich also darum, bevor Ihr System zusammenbricht.

Häufige Fallstricke beim verteilten Crawling und wie man sie vermeidet

Bei der Verwendung von verteiltem Web-Crawling gibt es zahlreiche Fallstricke, weshalb die meisten Ingenieure nach Alternativen suchen, wie z. B. den Datensätzen von Bright Data. Einige dieser Fallstricke sind:

Die “Single Point of Failure”-Falle

Es ist keine gute Idee, alles um eine Redis-Instanz oder einen Master-Koordinator herum aufzubauen. Wenn sie stirbt, bleibt der gesamte Crawl stehen.

Behebung: Verwenden Sie Redis Cluster oder mehrere Broker-Instanzen. Entwerfen Sie so, dass der Koordinator verschwindet, so dass die Arbeiter mit dem Ausfall des Brokers umgehen können und sich automatisch wieder verbinden.

Die Todesspirale der Wiederholungsversuche

Wenn fehlgeschlagene URLs sofort in die Hauptwarteschlange zurückkehren, entsteht eine Endlosschleife, die unterbrochene Endpunkte behindert und Ihre Pipeline verstopft.

Behebung: Getrennte Warteschlangen für Wiederholungsversuche mit exponentiellem Backoff. Erster Wiederholungsversuch nach 1 Minute, dann 5, dann 30. Nach 3 Fehlversuchen in eine Warteschlange für tote Briefe zur manuellen Überprüfung senden.

Der Trugschluss, dass alle Arbeitnehmer gleich sind

Bei der Aufgabenverteilung nach dem Round-Robin-Prinzip wird davon ausgegangen, dass jeder Mitarbeiter über die gleiche Netzwerkgeschwindigkeit, Proxy-Qualität und Verarbeitungsleistung verfügt. Die Realität ist oft unübersichtlicher.

Behebung: Implementieren Sie eine Bewertung der Arbeiter auf der Grundlage von Erfolgsrate, Latenz und Durchsatz. Leiten Sie schwierigere Aufträge an Ihre besten Mitarbeiter weiter.

Die Zeitbombe des Speicherverlusts

Worker, die nie neu gestartet werden, sammeln Speicherlecks an, insbesondere beim Parsen von fehlerhaftem HTML oder bei der Verarbeitung großer Antworten. Wenn sie sich selbst überlassen werden, verschlechtert sich die Leistung Ihres verteilten Web-Crawlings, bis die Worker abstürzen.

Behebung: Neustart der Worker nach der Bearbeitung von 1000 Aufgaben oder alle 4 Stunden. Überwachen Sie die Speichernutzung und implementieren Sie Unterbrecher.

Schlussfolgerung

Sie haben nun die Grundlage für verteiltes Crawling, das sich auf Millionen von Seiten skalieren lässt. Um Ihr Verständnis für die Grundlagen des Web-Crawling zu vertiefen, die verteilten Systemen zugrunde liegen, lesen Sie unseren umfassenden Web-Crawler-Überblick.

Die Architektur ist einfach, aber die brutale Wahrheit ist, dass 90 % der Teams immer noch scheitern, weil sie die Komplexität eines verteilten Web-Crawling-Systems zur Erkennung unterschätzen. Die Verwaltung tausender Proxys, die Rotation von Fingerabdrücken und die Handhabung von CAPTCHAs wird zu einem Vollzeit-Albtraum für Ingenieure, der von der Gewinnung wertvoller Daten ablenkt.

Genau aus diesem Grund gibt es die Web Unlocker API von Bright Data. Anstatt Monate mit dem Aufbau einer Proxy-Infrastruktur zu verbringen, die jede Woche ausfällt, leiten Ihre verteilten Mitarbeiter Anfragen einfach über die API von Web Unlocker mit einer Erfolgsquote von über 99 % weiter.

Keine Proxy-Verwaltung, keine Fingerabdruckrotation, keine CAPTCHA-Auflösung – nur zuverlässige Datenextraktion in großem Umfang. Ihr Technikteam konzentriert sich auf die Entwicklung der Geschäftslogik, während Bright Data das Katz-und-Maus-Spiel mit Anti-Bot-Systemen übernimmt.

Die Rechnung ist einfach: Ein selbst entwickelter Erkennungsschutz kostet monatelang Entwicklungszeit und verursacht laufende Wartungsprobleme, während Web Unlocker nur einen Bruchteil davon kostet und gleichzeitig eine Zuverlässigkeit auf Unternehmensniveau bietet. Hören Sie also auf, das Rad neu zu erfinden, und beginnen Sie mit der Gewinnung von Erkenntnissen. Holen Sie sich noch heute Ihr kostenloses Bright Data-Konto und verwandeln Sie Ihren verteilten Crawler von einer Wartungslast in einen Wettbewerbsvorteil.