AI

Erstellen Sie einen RAG-Agenten mit Google ADK und Vertex AI RAG Engine

Dieser Leitfaden behandelt die Erstellung eines produktionsreifen RAG-Agenten mit Google ADK und Vertex KI, von der Einrichtung bis zur multimodalen Suche und der Generierung präziser Antworten.
33 min lesen
RAG Agent using Google ADK and Vertex AI RAG Engine

In diesem Artikel erfahren Sie:

  • Wie Sie mit Google ADK und Vertex KI RAG Engine ein produktionsreifes RAG-System aufbauen
  • Wie Sie eine hybride Suche mit semantischer und Stichwort-Suche implementieren
  • Wie Sie durch richtige Grundierung und Zitierung Halluzinationen verhindern
  • Wie Sie multimodale Inhalte wie Text, Bilder und Tabellen verarbeiten
  • Wie Sie Ihr RAG mit Echtzeit-Webdaten mithilfe der Bright Data -Integration verbessern können (optional)

Los geht’s!

Die Herausforderung des modernen Wissensmanagements

Technische Dokumentationen werden in Wikis gespeichert, Produktspezifikationen finden sich in PDF-Dateien, Kundendaten in Datenbanken und institutionelles Wissen in E-Mails. Mitarbeiter verbringen Stunden mit der Suche nach Informationen und stoßen dabei oft auf veraltete oder unvollständige Antworten. Große Sprachmodelle, die auf allgemeinen Daten trainiert wurden, können nicht auf Ihr firmeneigenes Wissen zugreifen. Sie machen oft Fehler, wenn sie nach unternehmensspezifischen Informationen gefragt werden.

RAG-Agenten lösen dieses Problem, indemsie vor der Generierung von Antworten relevante Kontexte aus Ihrer Wissensdatenbank abrufen. Dadurch basiert die KI auf Fakten, Halluzinationen werden reduziert und transparente Quellenangaben zur Überprüfung bereitgestellt.

Was wir entwickeln: Intelligentes RAG-Agentensystem

Wir werden einen produktionsreifen RAG-Agenten entwickeln, der Dokumente aus verschiedenen Quellen aufnimmt, sie zu durchsuchbaren Teilen verarbeitet, sie in Vektordarstellungen umwandelt, relevante Kontexte mithilfe einer hybriden Suche abruft und korrekte Antworten mit korrekten Zitaten generiert, wodurch Ungenauigkeiten vermieden werden.

Das System verwaltet:

  • Dokumenteneingabe aus Cloud-Speicher, Drive und lokalen Dateien
  • Intelligentes Chunking mit Überlappung und Beibehaltung von Metadaten
  • Hybride Abfrage, die semantische Ähnlichkeit und Keyword-Matching kombiniert
  • Multimodale Inhalte, einschließlich Bildern und Tabellen
  • Erstellung von Zitaten zur Überprüfung von Antworten
  • Erkennung und Vermeidung von Fehlern

Voraussetzungen

Richten Sie Ihre Entwicklungsumgebung mit folgenden Komponenten ein:

  • Python 3.10 oder höher – Erforderlich für die Kompatibilität mit Google ADK.
  • Google Cloud-Projekt – Erstellen Sie ein Projekt in der Google Cloud Console mit aktivierter Abrechnung.
  • Dienstkonto – Erstellen Sie ein Dienstkonto mit den Rollen „Vertex KI-Benutzer“ und „Speicherobjekt-Viewer“.
    Creatuing a service account with Vertex AI User and Storage Object Viewer roles
  • Google ADK – Agent Development Kit zum Erstellen von KI-Agenten; siehe Dokumentation.
  • Vertex KI API – Aktivieren Sie die Vertex KI API in Ihrem Google Cloud-Projekt.
    Enabling Vertex AI API in your Google Cloud project
  • Virtuelle Python-Umgebung – Hält Abhängigkeiten isoliert; siehe venv-Dokumentation.

Einrichten der Umgebung

Erstellen Sie Ihr Projektverzeichnis und installieren Sie Abhängigkeiten:

python -m venv venv
# macOS/Linux: source venv/bin/activate
# Windows: venvScriptsactivate
pip install google-genai google-cloud-aiplatform google-cloud-storage langchain-google-vertexai pypdf python-dotenv pandas pillow

Erstellen Sie eine neue Datei namens rag_agent.py und fügen Sie die folgenden Importe hinzu:

import os
import json
import PyPDF2
import fitz
import time
import vertexai
from google import genai
from vertexai.preview import rag
from pathlib import Path
from vertexai.preview.generative_models import GenerativeModel, Tool
from google.cloud import storage
from typing import List, Dict, Any, Optional
from datetime import datetime
from dotenv import load_dotenv
from google.api_core.exceptions import ResourceExhausted
from google.genai import types

load_dotenv()

Erstellen Sie eine .env -Datei mit Ihren Anmeldedaten:

GOOGLE_CLOUD_PROJECT="Ihre-Projekt-ID"
GOOGLE_CLOUD_LOCATION="us-central1"
GOOGLE_APPLICATION_CREDENTIALS="Pfad/zu/service-account-key.json"
GENAI_API_KEY="Ihr-genai-API-Schlüssel"
GCS_BUCKET_NAME="Ihr-Bucket-Name"

Sie benötigen:

  • Projekt-ID: Ihre Google Cloud-Projekt-ID aus der Konsole
  • Standort: Region für Vertex KI-Ressourcen (us-east1 empfohlen)
  • Dienstkontoschlüssel: JSON-Schlüsseldatei, die von IAM & Admin heruntergeladen wurde
  • GenAI-API-Schlüssel: Erstellen Sie diesen in Google AI Studio
    Creating API Key in Google AI Studio
  • GCS-Bucket: Cloud Storage-Bucket für die Speicherung von Dokumenten
    Creating Cloud Storage bucket for document storage

Aufbau des RAG-Agent-Systems

Schritt 1: Google ADK einrichten

Konfigurieren Sie den Google ADK-Client und initialisieren Sie Vertex AI mit der richtigen Authentifizierung. Der Client wickelt alle Interaktionen mit den generativen KI-Diensten von Google ab.

def initialize_adk():
    """Initialisieren Sie Vertex KI mit der richtigen Authentifizierung."""

    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

    vertexai.init(
        project=os.getenv("GOOGLE_CLOUD_PROJECT"),
        location=os.getenv("GOOGLE_CLOUD_LOCATION")
    )

    print(f"✓ Vertex KI initialisiert")

# Initialisieren Sie das System.
initialize_adk()

Die Initialisierung stellt Verbindungen sowohl zum GenAI-Client für Agentenoperationen als auch zu Vertex AI für RAG-Funktionen her. Sie überprüft die Anmeldedaten und bestätigt die Projektkonfiguration, bevor sie fortfährt.

Schritt 2: Konfiguration der Vertex AI RAG-Engine

Erstellen Sie einen RAG-Korpus, der als Grundlage für Ihre Wissensdatenbank dient. Der Korpus speichert indizierte Dokumente, verwaltet Einbettungen und verarbeitet Abrufabfragen.

def create_rag_corpus(corpus_name: str, description: str) -> str:
    """Erstellen Sie einen neuen RAG-Korpus für die Speicherung und den Abruf von Dokumenten."""

    try:
        corpus = rag.create_corpus(
            display_name=corpus_name,
            description=description,
            embedding_model_config=rag.EmbeddingModelConfig(
                publisher_model="publishers/google/models/text-embedding-004"
            )
        )

        corpus_id = corpus.name.split('/')[-1]

        print(f"✓ RAG-Korpus erstellt: {corpus_name}")
        print(f"✓ Korpus-ID: {corpus_id}")
        print(f"✓ Einbettungsmodell: text-embedding-004")

        return corpus_id

    except Exception as e:
        print(f"Fehler beim Erstellen des Korpus: {str(e)}")
        raise

def configure_retrieval_parameters(corpus_id: str) -> Dict[str, Any]:
    """Konfigurieren Sie die Abrufparameter für eine optimale Suchleistung."""

    retrieval_config = {
        "corpus_id": corpus_id,
        "similarity_top_k": 10,
        "vector_distance_threshold": 0.5,
        "filter": {},
        "ranking_config": {
            "rank_service": "default",
            "alpha": 0.5 
        }
    }

    print(f"✓ Abrufparameter konfiguriert")
    print(f"  - Top-K-Ergebnisse: {retrieval_config['similarity_top_k']}")
    print(f"  - Entfernungsschwelle: {retrieval_config['vector_distance_threshold']}")
    print(f"  - Hybridsuche Alpha: {retrieval_config['ranking_config']['alpha']}")

    return retrieval_config

Die Korpus-Erstellung verwendet das Text-Embedding-004-Modell von Google für hochwertige semantische Einbettungen. Die Abrufkonfiguration gleicht semantische Ähnlichkeit und Keyword-Übereinstimmung durch den Alpha-Parameter aus, wobei 0,5 eine gleichmäßige Gewichtung ergibt.

Schritt 3: Pipeline zur Dokumentenerfassung

Erstellen Sie eine robuste Pipeline zur Dokumenteneingabe, die mehrere Dateiformate verarbeitet, sauberen Text extrahiert und wichtige Metadaten für eine verbesserte Suche beibehält.

def extract_text_from_pdf(file_path: str) -> Dict[str, Any]:
    """Extrahieren Sie Text und Metadaten aus PDF-Dokumenten."""

    with open(file_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)

        metadata = {
            'source': file_path,
            'num_pages': len(pdf_reader.pages),
            'title': pdf_reader.metadata.get('/Title', ''),
            'author': pdf_reader.metadata.get('/Author', ''),
            'created_date': str(datetime.now())
        }

        text_content = []
        for page_num, page in enumerate(pdf_reader.pages):
            page_text = page.extract_text()
            text_content.append({
                'page': page_num + 1,
                'text': page_text,
                'char_count': len(page_text)
            })

        return {
            'metadata': metadata,
            'content': text_content,
            'full_text': ' '.join([p['text'] for p in text_content])
        }

def preprocess_document(text: str) -> str:
    """Dokumenttext bereinigen und normalisieren, um eine optimale Indizierung zu erzielen."""

    text = ' '.join(text.split())

    text = text.replace('x00', '')

    text = text.replace('rn', 'n')

    lines = text.split('n')
    cleaned_lines = [
        line for line in lines 
        if len(line.strip()) > 3 
        and not line.strip().isdigit() 
    ]

    return 'n'.join(cleaned_lines)

Die Chunking-Strategie nutzt Satzgrenzen, um Unterbrechungen mitten im Gedankengang zu vermeiden, implementiert Überlappungen, um den Kontext über Chunks hinweg zu bewahren, und speichert Metadaten über die Positionen der Chunks für eine genaue Zitierbarkeit. Die Chunk-Größe von 1000 Zeichen sorgt für ein Gleichgewicht zwischen Suchgenauigkeit und Kontextvollständigkeit.
Storing pdf files in Cloud Storage bucket

Schritt 4: Einbettung und Indizierung

Laden Sie Dokumente in den RAG-Korpus hoch und generieren Sie Vektor-Einbettungen für die semantische Suche. Das System übernimmt automatisch die Generierung der Einbettungen und die Indexoptimierung.

def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[Dict[str, Any]]:
    """Teilen Sie das Dokument in überlappende Chunks auf, um eine optimale Suche zu erzielen."""

    chunks = []
    start = 0
    text_length = len(text)
    chunk_id = 0

    while start < text_length:
        end = start + chunk_size

        if end < text_length:
            last_period = text.rfind('.', start, end)
            if last_period != -1 and last_period > start:
                end = last_period + 1

        chunk_text = text[start:end].strip()

        if chunk_text:
            chunks.append({
                'chunk_id': chunk_id,
                'text': chunk_text,
                'start_char': start,
                'end_char': end,
                'char_count': len(chunk_text)
            })
            chunk_id += 1

        start = end - overlap

    print(f"✓ Created {len(chunks)} chunks with {overlap} char overlap")
    return chunks

def upload_file_to_gcs(local_path: str, gcs_bucket: str) -> str:
    """Upload document to Google Cloud Storage for RAG ingestion."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(gcs_bucket)

    blob_name = f"rag-docs/{Path(local_path).name}"
    blob = bucket.blob(blob_name)

    blob.upload_from_filename(local_path)

    gcs_uri = f"gs://{gcs_bucket}/{blob_name}"
    print(f"✓ Hochgeladen zu GCS: {gcs_uri}")

    return gcs_uri

def import_documents_to_corpus(corpus_id: str, file_uris: List[str]) -> str:
    """Importiere Dokumente in den RAG-Korpus und generiere Einbettungen."""

    print(f"⚡ Import für {len(file_uris)} Dokumente wird gestartet...")


    response = rag.import_files(
        corpus_name=f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('GOOGLE_CLOUD_LOCATION')}/ragCorpora/{corpus_id}",
        paths=file_uris,
        chunk_size=1000,
        chunk_overlap=200
    )


    try:
        if hasattr(response, 'result'):
            print("⏳ Warten auf Abschluss des Importvorgangs (dies kann eine Minute dauern)...")
            response.result() 
        else:
            print("✓ Importanforderung gesendet.")

    except Exception as e:
        print(f"⚠️ Hinweis zum Warten: {e}")

    print(f"✓ Dokumente importiert und Indizierung ausgelöst.")

    return getattr(response, 'name', 'unknown_operation')

def create_vector_index(corpus_id: str, index_config: Dict[str, Any]) -> str:
    """Erstellt einen optimierten Vektorindex für die schnelle Ähnlichkeitssuche."""

    index_settings = {
        'corpus_id': corpus_id,
        'distance_measure': 'COSINE',
        'algorithm': 'TREE_AH',  
        'leaf_node_embedding_count': 1000,
        'leaf_nodes_to_search_percent': 10
    }

    print(f"✓ Vektorindex mit TREE_AH-Algorithmus erstellt")
    print(f"✓ Distanzmaß: COSINE-Ähnlichkeit")
    print(f"✓ Optimiert für {index_settings['leaf_nodes_to_search_percent']}% Suchabdeckung")

    return corpus_id

Der Importprozess übernimmt automatisch das Parsing und Chunking von Dokumenten sowie die Generierung von Einbettungen. Der TREE_AH-Algorithmus ermöglicht eine schnelle Suche nach annähernden nächsten Nachbarn bei gleichzeitig hoher Trefferquote. Die Kosinusähnlichkeit misst den Winkelabstand zwischen Einbettungsvektoren für die semantische Übereinstimmung.

Schritt 5: Agentenentwicklung mit ADK

Erstellen Sie die Kernarchitektur des Agenten, die den Kontext verwaltet, Benutzeranfragen bearbeitet und die Abfrage mit der Generierung von Antworten koordiniert.

class RAGAgent:
    """Intelligenter RAG-Agent mit Kontextverwaltung und Grounding."""

    def __init__(self, corpus_id: str, model_name: str = "gemini-2.5-flash"):
        self.corpus_id = corpus_id
        self.model_name = model_name
        self.conversation_history = []

        self.rag_tool = Tool.from_retrieval(
            retrieval=rag.Retrieval(
                source=rag.VertexRagStore(
                    rag_corpora=[f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('GOOGLE_CLOUD_LOCATION')}/ragCorpora/{corpus_id}"],
                    similarity_top_k=5,
                    vector_distance_threshold=0.3
                )
            )
        )

        self.model = GenerativeModel(
            model_name=model_name,
            tools=[self.rag_tool]
        )

        print(f"✓ Initialized RAG agent with {model_name}")
        print(f"✓ Mit Korpus verbunden: {corpus_id}")

    def manage_context(self, query: str, max_history: int = 5) -> List[Dict[str, str]]:
        """Konversationskontext mit Kürzung der Historie verwalten."""

        self.conversation_history.append({
            'role': 'user',
            'content': query,
            'timestamp': datetime.now().isoformat()
        })

        if len(self.conversation_history) > max_history * 2:
            self.conversation_history = self.conversation_history[-max_history * 2:]

        formatted_history = []
        for msg in self.conversation_history:
            formatted_history.append({
                'role': msg['role'],
                'parts': [msg['content']]
            })

        return formatted_history

    def build_grounded_prompt(self, query: str, retrieved_context: List[Dict[str, Any]]) -> str:
        """Erstelle eine Eingabeaufforderung mit expliziten Anweisungen zur Verankerung."""

        context_text = "nn".join([
            f"[Quelle {i+1}]: {ctx['text']}" 
            for i, ctx in enumerate(retrieved_context)
        ])

        prompt = f"""Sie sind ein hilfreicher KI-Assistent mit Zugriff auf eine Wissensdatenbank. 
        Beantworten Sie die folgende Frage AUSSCHLIESSLICH anhand der im folgenden Kontext bereitgestellten Informationen.

        WICHTIGE ANWEISUNGEN:
        1. Stützen Sie Ihre Antwort ausschließlich auf den bereitgestellten Kontext.
        2. Wenn der Kontext nicht genügend Informationen enthält, weisen Sie ausdrücklich darauf hin.
        3. Zitieren Sie bestimmte Quellen unter Verwendung der Notation [Quelle X].
        4. Fügen Sie keine Informationen aus Ihrem Allgemeinwissen hinzu.
        5. Wenn Sie sich unsicher sind, geben Sie dies zu.

        KONTEXT:
        {context_text}

        FRAGE:
        {query}

        ANTWORT:"""

        return prompt

Der Agent speichert den Gesprächsverlauf für Interaktionen mit mehreren Runden, verwaltet die Größe des Kontextfensters, um Token-Beschränkungen zu vermeiden, und erstellt Eingabeaufforderungen mit expliziten Anweisungen, um Halluzinationen zu reduzieren. Die Integration des RAG-Tools ermöglicht die automatische Abfrage während der Generierung.

Schritt 6: Abfrageverarbeitung und Abruf

Implementieren Sie eine hybride Suche, die semantisches Verständnis mit Keyword-Matching kombiniert, um eine optimale Abrufgenauigkeit zu erzielen.

   def hybrid_search(
        self,
        corpus_id: str,
        query: str,
        semantic_weight: float = 0.7,
        top_k: int = 10
    ) -> List[Dict[str, Any]]:
        """Führen Sie eine hybride Suche mit automatischer Wiederholung bei Quotenbeschränkungen durch."""

        rag_resource = rag.RagResource(
            rag_corpus=f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('GOOGLE_CLOUD_LOCATION')}/ragCorpora/{corpus_id}"
        )

        max_retries = 3  
        base_delay = 90 

        for attempt in range(max_retries):
            try:
                print(f"🔍 Suche im Korpus (Versuch {attempt + 1})...")

                results = rag.retrieval_query(
                    rag_resources=[rag_resource],
                    text=query,
                    similarity_top_k=top_k,
                    vector_distance_threshold=0.5 
                )

                # Bei Erfolg Ergebnisse verarbeiten und zurückgeben
                abgerufene_Teile = []
                für i, Kontext in enumerate(Ergebnisse.Kontexte.Kontexte):
                    retrieved_chunks.append({
                        'rank': i + 1,
                        'text': context.text,
                        'source': context.source_uri if hasattr(context, 'source_uri') else 'unknown',
                        'distance': context.distance if hasattr(context, 'distance') else 0.0
                    })

                print(f"✓ {len(retrieved_chunks)} relevante Chunks abgerufen")
                return retrieved_chunks

            except ResourceExhausted:
                wait_time = base_delay * (2 ** attempt)
                print(f"⚠️ Kontingent erreicht (Limit: 5/min). Abkühlung für {wait_time}s...")
                time.sleep(wait_time)

            except Exception as e:
                print(f"❌ Abruf-Fehler: {str(e)}")
                raise

        print("❌ Maximale Anzahl von Wiederholungsversuchen erreicht. Abruf fehlgeschlagen.")
        return []

    def rerank_results(
        self,
        results: List[Dict[str, Any]],
        query: str,
        model_name: str = "gemini-2.5-flash"
    ) -> List[Dict[str, Any]]:
        """Rerank abgerufen Ergebnisse basierend auf der Relevanz der Abfrage."""

        if not results:
            return []

        rerank_prompt = f"""Bewerten Sie die Relevanz jedes Abschnitts für die Abfrage auf einer Skala von 0 bis 10.

    Abfrage: {query}

    Abschnitte:
    {chr(10).join([f"{i+1}. {r['text'][:200]}..." for i, r in enumerate(results)])}

    Geben Sie nur eine durch Kommas getrennte Liste der Bewertungen zurück (z. B. 8,6,9,3,7)."""

        model = GenerativeModel(model_name)
        response = model.generate_content(rerank_prompt)

        if response.text:
            try:
                scores = [float(s.strip()) for s in response.text.strip().split(',')]
                for i, score in enumerate(scores[:len(results)]):
                    results[i]['rerank_score'] = score

                results.sort(key=lambda x: x.get('rerank_score', 0), reverse=True)
                print(f"✓ Neu sortierte Ergebnisse unter Verwendung der LLM-Bewertung")
            except Exception as e:
                print(f"Warnung: Neusortierung fehlgeschlagen, Verwendung der ursprünglichen Reihenfolge: {str(e)}")

        return results

Die hybride Suche ruft Kandidaten anhand der Vektorsimilarität ab, anschließend wird mithilfe des LLM eine Neubewertung der Relevanz auf Grundlage des abfragespezifischen Kontexts vorgenommen. Dieser zweistufige Ansatz sorgt für ein ausgewogenes Verhältnis zwischen Effizienz und Genauigkeit.

Schritt 7: Generierung von Antworten und Grounding

Generieren Sie Antworten mit korrekten Zitaten und verhindern Sie Halluzinationen durch strenge Grounding-Überprüfung.

   def generate_grounded_response(
        self,
        agent: 'RAGAgent',
        query: str,
        retrieved_context: List[Dict[str, Any]],
        temperature: float = 0.2
    ) -> Dict[str, Any]:
        """Antwort mit Zitaten und Halluzinationsvermeidung generieren."""

        grounded_prompt = agent.build_grounded_prompt(query, retrieved_context)

        chat = agent.model.start_chat()
        response = chat.send_message(
            grounded_prompt,
            generation_config={
                'temperature': temperature,
                'top_p': 0.8,
                'top_k': 40,
                'max_output_tokens': 1024
            }
        )

        return {
            'answer': response.text,
            'sources': retrieved_context,
            'query': query,
            'timestamp': datetime.now().isoformat()
        }

    def verify_grounding(
        self,
        response: str,
        sources: List[Dict[str, Any]],
        model_name: str = "gemini-2.5-flash"
    ) -> Dict[str, Any]:
        """Überprüfen Sie, ob die Antworten auf dem Quellenmaterial basieren."""

        verification_prompt = f"""Analysieren Sie, ob die folgende Antwort vollständig durch die angegebenen Quellen gestützt wird.

    QUELLEN:
    {chr(10).join([f"Quelle {i+1}: {s['text']}" for i, s in enumerate(sources)])}

    ANTWORT:
    {response}

    Überprüfen Sie jede Behauptung in der Antwort. Antworten Sie mit JSON:
    {{
        "is_grounded": true/false,
        "unsupported_claims": ["claim1", "claim2"],
        "confidence_score": 0.0-1.0
    }}"""

        model = GenerativeModel(model_name)
        verification_response = model.generate_content(verification_prompt)

        try:
            json_text = verification_response.text.strip()
            if '```json' in json_text:
                json_text = json_text.split('```json')[1].split('```')[0].strip()

            verification_result = json.loads(json_text)

            print(f"✓ Grounding-Verifizierung abgeschlossen")
            print(f"  - Grounded: {verification_result.get('is_grounded', False)}")
            print(f"  - Confidence: {verification_result.get('confidence_score', 0.0):.2f}")

            return verification_result

        except Exception as e:
            print(f"Warnung: Überprüfung der Fundierung fehlgeschlagen: {str(e)}")
            return {'is_grounded': True, 'confidence_score': 0.5}

Die Überprüfung der Fundiertheit stellt sicher, dass jede Aussage in der Antwort auf Quelldokumente zurückgeführt werden kann. Eine niedrige Temperaturgenerierung (0,2) reduziert kreative Ausschmückungen und verbessert die sachliche Genauigkeit.

Schritt 8: Multimodale RAG-Implementierung

Erweitern Sie das RAG-System, um Bilder, Tabellen und andere Nicht-Text-Inhalte für eine umfassende Wissensgewinnung zu verarbeiten.

def extract_images_from_pdf(self, pdf_path: str, output_dir: str) -> List[Dict[str, Any]]:
        """Extrahieren Sie Bilder aus PDF-Dokumenten für die multimodale Indizierung."""   

        doc = fitz.open(pdf_path)
        images = []

        os.makedirs(output_dir, exist_ok=True)

        for page_num in range(len(doc)):
                page = doc[page_num]
                image_list = page.get_images()

                for img_index, img in enumerate(image_list):
                    xref = img[0]
                    base_image = doc.extract_image(xref)
                    image_bytes = base_image["image"]

                    # Bild speichern
                    image_filename = f"page{page_num + 1}_img{img_index + 1}.png"
                    image_path = os.path.join(output_dir, image_filename)

                    with open(image_path, "wb") as img_file:
                        img_file.write(image_bytes)

                    images.append({
                        'page': page_num + 1,
                        'image_path': image_path,
                        'format': base_image['ext'],
                        'size': len(image_bytes)
                    })

        print(f"✓ {len(images)} Bilder aus PDF extrahiert")
        return images

    def process_table_content(self, table_text: str) -> Dict[str, Any]:
        """Tabellendaten für verbesserte Abfrage verarbeiten und strukturieren."""

        lines = table_text.strip().split('n')
        if not lines:
                return {}

        headers = [h.strip() for h in lines[0].split('|') if h.strip()]

        rows = []
        for line in lines[1:]:
                cells = [c.strip() for c in line.split('|') if c.strip()]
                if len(cells) == len(headers):
                    row_dict = dict(zip(headers, cells))
                    rows.append(row_dict)

        return {
            'headers': headers,
            'rows': rows,
            'row_count': len(rows),
            'column_count': len(headers)
        }

    def create_multimodal_embedding(
        self,
        text: str,
        image_path: Optional[str] = None,
        table_data: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Erstellt eine einheitliche Einbettung für multimodale Inhalte."""

        combined_text = text

        if table_data and table_data.get('rows'):
                table_desc = f"nTabelle mit {table_data['row_count']} Zeilen und Spalten: {', '.join(table_data['headers'])}n"
                combined_text += table_desc

        if image_path:
                combined_text += f"n[Bild: {Path(image_path).name}]"

        return {
            'text': combined_text,
            'has_image': image_path is not None,
            'has_table': table_data is not None,
            'modalities': sum([bool(text), bool(image_path), bool(table_data)])
        }

Bei der multimodalen Verarbeitung werden neben Text auch Bilder und Tabellen extrahiert und indexiert. Der einheitliche Einbettungsansatz kombiniert beschreibende Metadaten aus allen Modalitäten zu durchsuchbarem Text. Dadurch können Abfragen wie „Zeige mir die Preistabelle aus dem Bericht zum 3. Quartal” sowohl die Tabellendaten als auch den umgebenden Kontext abrufen.

Schritt 9: Integration von Google ADK Agent

Integrieren Sie das Agent Development Kit (ADK) von Google, um eine bessere Agentenschnittstelle zu erstellen, die mit Ihrem Vertex AI RAG Engine-Backend verbunden ist. Das ADK bietet verbesserte Agentenfunktionen, darunter Tool-Aufrufe, mehrteilige Konversationen und strukturierte Antworten.

class ADKRAGAgent:
    """Google ADK Agent-Wrapper, der Vertex KI RAG Engine als Backend verwendet."""

    def __init__(self, corpus_id: str, project_id: str, location: str):
        """Initialisieren Sie den ADK-Agenten mit RAG-Funktionen."""
        self.corpus_id = corpus_id
        self.project_id = project_id
        self.location = location

        self.rag_agent = RAGAgent(corpus_id)

        self.client = genai.Client(
            vertexai=True,
            project=project_id,
            location=location
        )

        self.model_name = "gemini-2.0-flash-001"

        print(f"✓ Google ADK Agent initialisiert")
        print(f"  - Framework: Google ADK (genai.Client)")
        print(f"  - Backend: Vertex KI RAG Engine")
        print(f"  - Projekt: {project_id}")
        print(f"  - Standort: {location}")
        print(f"  - RAG-Korpus: {corpus_id}")

    def create_rag_search_tool(self) -> types.Tool:
        """RAG-Suchwerkzeug für ADK-Agent erstellen."""

        def rag_search(query: str) -> str:
            """
            RAG-Korpus durchsuchen und fundierte Antworten zurückgeben.

            Argumente:
                query: Die zu suchende Frage des Benutzers

            Rückgabewerte:
                Eine fundierte Antwort mit Zitaten aus der Wissensdatenbank
"""
try:
results = self.rag_agent.hybrid_search(
self.corpus_id,
query,
semantic_weight=0.7,
top_k=10)


if not results:
return "Keine relevanten Informationen in der Wissensdatenbank gefunden."

                reranked = self.rag_agent.rerank_results(results, query)

                response = self.rag_agent.generate_grounded_response(
                    self.rag_agent,
                    query,
                    reranked[:5]
                )

                verification = self.rag_agent.verify_grounding(
                    response['answer'],
                    response['sources']
                )

                answer = response['answer']
                if not verification.get('is_grounded', True):
                    answer += f"nn[Confidence: {verification.get('confidence_score', 0):.0%}]"

                return answer

            except Exception as e:
                return f"Error searching knowledge base: {str(e)}"

        rag_tool = types.Tool(
            function_declarations=[
                types.FunctionDeclaration(
                    name="rag_search",
                    description="Durchsuchen Sie die Wissensdatenbank des Unternehmens mit RAG (Retrieval-Augmented Generation), um genaue, fundierte Antworten auf Fragen zu technischer Dokumentation, Produktspezifikationen und Benutzerhandbüchern zu finden.",
                    parameters={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Die Frage oder Suchanfrage des Benutzers"
                            }
                        },
                        "required": ["query"]
                    }
                )
            ]
        )

        self.rag_search_function = rag_search

        return rag_tool

    def create_agent(self) -> Dict[str, Any]:
        """Erstellen Sie eine Google ADK Agent-Konfiguration mit dem RAG-Tool."""

        rag_tool = self.create_rag_search_tool()

        agent_instructions = """Sie sind ein intelligenter RAG-Agent (Retrieval-Augmented Generation) mit Zugriff auf eine Unternehmens-Wissensdatenbank.

Ihre Fähigkeiten:
- Durchsuchen Sie technische Dokumentationen, Produktspezifikationen und Benutzerhandbücher.
- Geben Sie genaue, fundierte Antworten mit Quellenangaben.
- Führen Sie mehrteilige Gespräche mit Kontextbewusstsein.
- Überprüfen Sie die Richtigkeit der Informationen, bevor Sie antworten.

Richtlinien:
1. Verwenden Sie vor der Beantwortung immer das rag_search-Tool, um Informationen zu finden.
2. Geben Sie spezifische, detaillierte Antworten auf der Grundlage der abgerufenen Dokumente.
3. Fügen Sie relevante Zitate und Quellen hinzu.
4. Wenn keine Informationen gefunden werden, geben Sie dies deutlich an.
5. Behalten Sie den Gesprächskontext über mehrere Abfragen hinweg bei.

Seien Sie in allen Antworten hilfsbereit, genau und professionell.“““

        agent_config = {
            'model': self.model_name,
            'instructions': agent_instructions,
            'tools': [rag_tool],
            'display_name': 'RAG Agent mit Vertex KI (Google ADK + Vertex KI RAG Engine)'
        }

        print(f"✓ Google ADK-Agent-Konfiguration erstellt")
        print(f"  - Modell: {self.model_name}")
        print(f"  - Tools: RAG-Suche (Vertex KI RAG Engine)")

        return agent_config

    def chat(self, agent_config: Dict[str, Any], query: str, session_id: str = "default") -> str:
        """Senden Sie eine Nachricht an den ADK-Agenten und erhalten Sie eine Antwort mit Google GenAI."""

        self.rag_agent.manage_context(query)

        try:
            response = self.client.models.generate_content(
                model=agent_config['model'],
                contents=query,
                config=types.GenerateContentConfig(
                    system_instruction=agent_config['instructions'],
                    tools=agent_config['tools'],
                    temperature=0.2
                )
            )

            if response.candidates and len(response.candidates) > 0:
                candidate = response.candidates[0]

                if candidate.content and candidate.content.parts:
                    for part in candidate.content.parts:
                        if hasattr(part, 'function_call') and part.function_call:
                            function_name = part.function_call.name
                            function_args = part.function_call.args

                            print(f"  → ADK Agent calling tool: {function_name}")

                            if function_name == "rag_search":
                                query_arg = function_args.get("query", query)
                                tool_result = self.rag_search_function(query_arg)

                                response = self.client.models.generate_content(
                                    model=agent_config['model'],
                                    contents=[
                                        types.Content(role="user", parts=[types.Part(text=query)]),
                                        types.Content(role="model", parts=[part]),
                                        types.Content(
                                            role="function",
                                            parts=[types.Part(
                                                function_response=types.FunctionResponse(
                                                    name=function_name,
                                                    response={"result": tool_result}
                                                )
                                            )]
                                        )
                                    ],
                                    config=types.GenerateContentConfig(
                                        system_instruction=agent_config['instructions'],
                                        tools=agent_config['tools'],
                                        temperature=0.2
                                    )
                                )

                        elif hasattr(part, 'text') and part.text:
                            answer = part.text

                            self.rag_agent.conversation_history.append({
                                'role': 'assistant',
                                'content': answer,
                                'timestamp': datetime.now().isoformat()
                            })

                            return answer

                if response.candidates and response.candidates[0].content.parts:
                    for part in response.candidates[0].content.parts:
                        if hasattr(part, 'text') and part.text:
                            answer = part.text

                            self.rag_agent.conversation_history.append({
                                'role': 'assistant',
                                'content': answer,
                                'timestamp': datetime.now().isoformat()
                            })

                            return answer

            return "No response generated."

        except Exception as e:
            error_msg = f"Fehler im ADK-Agenten-Chat: {str(e)}"
            print(f"❌ {error_msg}")
            return error_msg

Die ADK-Integration fügt das Agent-Framework von Google zu Ihrem bestehenden RAG-Agenten hinzu. Die ADKRAGAgent-Klasse richtet einen genai.Client für Agentenoperationen ein und verwendet Ihren RAGAgent für die Abfrage. Die Methode create_rag_search_tool definiert eine Funktion, die der Agent aufrufen kann, sodass er Ihre Wissensdatenbank mit der Vertex KI RAG Engine durchsuchen kann.

Der Tool-Aufrufmechanismus ermöglicht es dem Agenten, anhand von Benutzeranfragen automatisch zu bestimmen, wann die Wissensdatenbank durchsucht werden soll. Wenn eine Suche erforderlich ist, führt er die hybride Suchpipeline aus, ordnet die Ergebnisse neu, generiert fundierte Antworten und überprüft die Genauigkeit, bevor er Antworten liefert. Die Chat-Methode verwaltet den gesamten Gesprächsablauf, einschließlich der Tool-Ausführung und der Verwaltung des Mehrfachkontexts.

Schritt 10: Optimieren Sie Ihr RAG mit Echtzeit-Webdaten von Bright Data

Während Ihr RAG-System sich hervorragend für das Abrufen von Informationen aus Ihrer internen Wissensdatenbank eignet, benötigen KI-Anwendungen in Unternehmen häufig aktuelle Echtzeitdaten aus externen Quellen. Hier kommt die Webdatenplattform von Bright Data ins Spiel, die es Ihrem RAG-Agenten ermöglicht, auf Live-Informationen aus dem gesamten Web zuzugreifen und Ihre Wissensdatenbank aktuell und umfassend zu halten.

Warum sollten Sie Bright Data in Ihr RAG-System integrieren?

1. Halten Sie Ihre Wissensdatenbank auf dem neuesten Stand

  • Aktualisieren Sie Ihren RAG-Korpus automatisch mit den neuesten Produktinformationen, Preisdaten, Wettbewerbsinformationen und Markttrends.
  • Beseitigen Sie veraltete Daten, die zu überholten KI-Antworten führen.
  • Planen Sie regelmäßige Datenaktualisierungen, um die Genauigkeit zu gewährleisten.

2. Gehen Sie über interne Dokumente hinaus

  • Greifen Sie auf Echtzeitdaten von über 120 beliebten Websites zu, darunter E-Commerce-Plattformen, Nachrichtenseiten, soziale Medien und branchenspezifische Quellen.
  • Ergänzen Sie Ihre technische Dokumentation mit Live-API-Dokumentation, Community-Diskussionen und aktualisierten Spezifikationen.
  • Beziehen Sie Kundenbewertungen, Feedback und Stimmungsdaten ein, um Ihre Produkt-Wissensdatenbank zu verbessern

3. Dynamische Abfrageverbesserung aktivieren

  • Wenn Ihr RAG-Agent eine Abfrage erkennt, die aktuelle Informationen erfordert (Preise, Verfügbarkeit, aktuelle Nachrichten), ruft er automatisch neue Daten ab
  • Kombinieren Sie internes Wissen mit externen Webdaten, um umfassende Antworten zu erhalten.
  • Stellen Sie den Benutzern sowohl historischen Kontext als auch aktuelle Informationen zur Verfügung

4. Skalieren Sie die Datenerfassung mühelos

  • Sie müssen keine Proxys verwalten, CAPTCHAs bearbeiten oder sich mit Anti-Bot-Systemen auseinandersetzen.
  • Bright Data kümmert sich um die gesamte Infrastruktur, die Entsperrung und die Datenqualität.
  • Konzentrieren Sie sich auf die KI-Entwicklung, während Bright Data die Datenbeschaffung übernimmt

Implementierung: Hinzufügen von Bright Data zu Ihrer RAG-Pipeline

Erweitern Sie Ihr RAG-System mit den Funktionen von Bright Data. Wir fügen drei Integrationsmuster hinzu: Datensatzintegration für vorab gesammelte Daten, Web-Scraper-API für Echtzeit-Scraping und KI-Scraper für angereicherte, KI-generierte Erkenntnisse.

Muster 1: Datensatzintegration für historische Daten

Nutzen Sie den Marktplatz für Datensätze von Bright Data, um Ihren RAG-Korpus schnell mit hochwertigen, strukturierten Daten zu füllen.

import requests
from typing import List, Dict
import json

class BrightDataRAGEnhancer:
    """Verbessern Sie das RAG-System mit den Webdatenfunktionen von Bright Data."""

    def __init__(self, api_key: str, rag_agent: RAGAgent):
        self.api_key = api_key
        self.rag_agent = rag_agent
        self.base_url = "https://api.brightdata.com"

    def fetch_dataset_data(
        self,
        dataset_id: str,
        filters: Dict[str, Any] = None,
        limit: int = 1000
    ) -> List[Dict[str, Any]]:
        """Daten aus dem Bright Data Dataset Marketplace abrufen."""

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        endpoint = f"{self.base_url}/Datensätze/v3/snapshot/{dataset_id}"

        params = {
            "format": "json",
            "limit": limit
        }

        if filters:
            params["filter"] = json.dumps(filters)

        response = requests.get(endpoint, headers=headers, params=params)
        response.raise_for_status()

        print(f"✓ {len(response.json())} Datensätze aus dem Datensatz {dataset_id} abgerufen")
        return response.json()

    def ingest_dataset_to_rag(
        self,
        corpus_id: str,
        dataset_records: List[Dict[str, Any]],
        text_fields: List[str]
    ) -> None:
        """Datensatzdatensätze verarbeiten und zum RAG-Korpus hinzufügen."""

        processed_chunks = []

        for record in dataset_records:
            # Kombinieren Sie bestimmte Textfelder zu durchsuchbaren Inhalten.
            combined_text = " ".join([
                str(record.get(field, ""))
                for field in text_fields
                if record.get(field)
            ])

            if combined_text.strip():
                # Fügen Sie Metadaten für eine bessere Suche hinzu.
                metadata = {
                    "source": "bright_data_dataset",
                    "record_id": record.get("id", "unknown"),
                    "ingestion_date": datetime.now().isoformat(),
                    "data_type": "external_web_data"
                }

                # Inhalt in Blöcke aufteilen
                chunks = chunk_document(combined_text, chunk_size=1000, overlap=200)

                for chunk in chunks:
                    chunk['metadata'] = metadata
                    processed_chunks.append(chunk)

        print(f"✓ {len(processed_chunks)} Blöcke aus den Datensätzen verarbeitet")

        # Temporäre Datei zum Hochladen erstellen
        temp_file = "temp_dataset_content.txt"
        with open(temp_file, 'w') as f:
            for chunk in processed_chunks:
                f.write(chunk['text'] + "nn")

        # In GCS hochladen und in Korpus importieren
        gcs_uri = upload_file_to_gcs(temp_file, os.getenv('GCS_BUCKET_NAME'))
        import_documents_to_corpus(corpus_id, [gcs_uri])

        os.remove(temp_file)
        print(f"✓ Datensatzinhalt zum RAG-Korpus hinzugefügt")

Anwendungsbeispiel: Füllen Sie Ihr E-Commerce-RAG mit Produktdaten

# Zuerst einen RAG-Korpus erstellen
corpus_id = create_rag_corpus(
    corpus_name="bright_data_corpus",
    description="Korpus für Bright Data Enhanced RAG")


# RAG-Agent mit Korpus initialisieren
rag_agent = RAGAgent(corpus_id=corpus_id)

# Enhancer initialisieren
enhancer = BrightDataRAGEnhancer(
    api_key=os.getenv("BRIGHT_DATA_API_KEY"),
    rag_agent=rag_agent)


print("✓ BrightDataRAGEnhancer erfolgreich initialisiert!")

# Amazon-Produktdaten abrufen
amazon_data = enhancer.fetch_dataset_data(
    dataset_id="gd_l7q7dkf244hwxr90h",  # Amazon-Produktdatensatz
    filters={"category": "Electronics"},
    limit=5000
)

# In RAG-Korpus aufnehmen
enhancer.ingest_dataset_to_rag(
    corpus_id=corpus_id,
    dataset_records=amazon_data,
    text_fields=["title", "description", "features", "reviews"]
)

Muster 2: Echtzeit-Web-Scraper-API-Integration

Für dynamische, aktuelle Informationen integrieren Sie die Web Scraper API von Bright Data direkt in die Abfrage-Pipeline Ihres RAG-Agenten.

def scrape_real_time_data(
    self,
    scraper_id: str,
    inputs: List[Dict[str, Any]],
    wait_for_completion: bool = True)
 -> List[Dict[str, Any]]:
    """Führen Sie Echtzeit-Web-Scraping mit Bright Data-Scrapern durch."""

    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json"
    }

    # Scraper auslösen
    trigger_url = f"{self.base_url}/dca/trigger"
    params = {
        "Scraper": Scraper_id,
        "queue_next": 1
    }

    response = requests.post(
        trigger_url,
        headers=headers,
        params=params,
        json=inputs
    )
    response.raise_for_status()

    snapshot_id = response.json().get("snapshot_id")
    print(f"✓ Scraper ausgelöst. Snapshot-ID: {snapshot_id}")

    if not wait_for_completion:
        return {"snapshot_id": snapshot_id, "status": "processing"}

    # Ergebnisse abfragen
    results_url = f"{self.base_url}/dca/dataset"
    params = {"id": snapshot_id}

    max_retries = 30
    for i in range(max_retries):
        time.sleep(10)  # 10 Sekunden zwischen den Abfragen warten

        results_response = requests.get(results_url, headers=headers, params=params)

        if results_response.status_code == 200:
            data = results_response.json()
            print(f"✓ Scraping abgeschlossen. {len(data)} Datensätze abgerufen")
            return data
        elif results_response.status_code == 202:
            print(f"⏳ Wird noch verarbeitet... ({i+1}/{max_retries})")
            continue
        else:
            print(f"❌ Fehler beim Abrufen der Ergebnisse: {results_response.status_code}")
            break

    return []

def create_dynamic_rag_tool(self) -> types.Tool:
    """RAG-Tool mit Echtzeit-Webdatenanreicherung erstellen."""

    def augmented_rag_search(query: str, include_live_data: bool = False) -> str:
        """
        Wissensdatenbank mit optionaler Echtzeit-Webdatenanreicherung durchsuchen.

        Argumente:
            query: Die Frage des Benutzers
            include_live_data: Ob aktuelle Webdaten abgerufen werden sollen

        Rückgabewerte:
            Fundierte Antwort, die interne und externe Daten kombiniert
        """
        # Zuerst interne Wissensdatenbank durchsuchen
        internal_results = self.rag_agent.hybrid_search(
            corpus_id=self.rag_agent.corpus_id,
            query=query,
            top_k=5
        )

        combined_results = internal_results

        # Wenn die Abfrage aktuelle Informationen erfordert, Live-Daten abrufen
        if include_live_data or self._requires_fresh_data(query):
            print("🌐 Abrufen von Echtzeit-Webdaten...")

            # Beispiel: Preisinformationen scrapen
            if "price" in query.lower() or "cost" in query.lower():
                live_data = self.scrape_real_time_data(
                    scraper_id="your_product_scraper_id",
                    inputs=[{"url": "https://example.com/products"}],
                    wait_for_completion=True
                )

                # Live-Daten in durchsuchbare Blöcke konvertieren
                for record in live_data[:3]:  # Top 3 Ergebnisse
                    combined_results.append({
                        'rank': len(combined_results) + 1,
                        'text': f"{record.get('title', '')}: {record.get('price', '')} - {record.get('description', '')}",
                        'source': f"Live-Webdaten: {record.get('url', 'unknown')}",
                        'distance': 0.3  # Hohe Relevanz für aktuelle Daten
                    })

        # Antwort mit allen verfügbaren Kontextinformationen generieren
        response = self.rag_agent.generate_grounded_response(
            self.rag_agent,
            query,
            combined_results
        )

        return response['answer']

    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration(
                name="augmented_rag_search",
                description="Durchsuchen der internen Wissensdatenbank und optional Abrufen von Echtzeit-Webdaten für aktuelle Informationen",
                parameters={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "description": "Frage des Benutzers"},
                        "include_live_data": {"type": "boolean", "description": "Aktuelle Webdaten abrufen"}
                    },
                    "required": ["query"]
                }
            )
        ]
    )

def _requires_fresh_data(self, query: str) -> bool:
    """Bestimmen Sie, ob die Abfrage Echtzeitdaten erfordert."""
    fresh_data_keywords = [
        „latest“, „current“, „today“, „now“, „recent“,
        „price“, „cost“, „available“, „in stock“
    ]
    return any(keyword in query.lower() for keyword in fresh_data_keywords)

Muster 3: Integration von KI-Scrapern für erweiterte Intelligenz

Nutzen Sie die KI-Scraper von Bright Data (ChatGPT, Perplexity, Gemini), um Ihr RAG mit KI-generierten Erkenntnissen und umfassendem Webkontext zu erweitern.

def query_ai_scraper(
        self,
        scraper_type: str,
        prompt: str,
        country_code: str = "us"
    ) -> Dict[str, Any]:
        """Fragen Sie KI-Scraper (ChatGPT, Perplexity usw.) nach erweitertem Kontext."""

        scraper_ids = {
            "chatgpt": "chatgpt_scraper_id",
            "perplexity": "perplexity_scraper_id",
            "gemini": "gemini_scraper_id"
        }

        inputs = [{
            "prompt": prompt,
            "country": country_code
        }]

        results = self.scrape_real_time_data(
            scraper_id=scraper_ids.get(scraper_type),
            inputs=inputs,
            wait_for_completion=True
        )

        if results:
            return {
                "answer": results[0].get("answer", ""),
                "sources": results[0].get("sources", []),
                "citations": results[0].get("citations", [])
            }

        return {}

    def create_hybrid_intelligence_agent(self) -> Dict[str, Any]:
        """Erstellen Sie einen Agenten, der RAG mit KI-Scraper-Intelligenz kombiniert."""

        def hybrid_search(query: str) -> str:
            """
            Kombinieren Sie interne RAG mit externer KI-Scraper-Intelligenz.

            Dies bietet:
            1. Internen Wissensdatenbankkontext
            2. Echtzeit-Erkenntnisse aus dem Internet, die von KI generiert werden
            3. Umfassende, gut recherchierte Antworten
            """
            # Internes Wissen abrufen
            internal_answer = self.rag_agent.hybrid_search(
                corpus_id=self.rag_agent.corpus_id,
                query=query,
                top_k=3
            )

            internal_context = "n".join([r['text'][:200] for r in internal_answer])

            # KI-Scraper-Anreicherung abrufen
            print("🤖 Abrufen von KI-erweiterter Web-Intelligenz...")
            ai_insight = self.query_ai_scraper(
                scraper_type="perplexity",  # Bekannt für gut recherchierte Antworten
                prompt=query
            )

            # Beide Quellen synthetisieren
            synthesis_prompt = f"""Synthetisieren Sie eine umfassende Antwort unter Verwendung sowohl interner Kenntnisse als auch externer KI-Erkenntnisse.

INTERNE WISSENSBASE:
{internal_context}

EXTERNE KI-ERKENNTNISSE:
{ai_insight.get('answer', 'Keine externen Erkenntnisse verfügbar')}

QUELLEN:
{json.dumps(ai_insight.get('citations', []), indent=2)}

FRAGE: {query}

Geben Sie eine vollständige Antwort, die:
1. internes Wissen für unternehmensspezifische Informationen priorisiert
2. externe Erkenntnisse für einen breiteren Kontext und aktuelle Entwicklungen nutzt
3. alle Quellen klar angibt
4. angibt, wann Informationen aus externen bzw. internen Quellen stammen"""

            model = GenerativeModel("gemini-2.0-flash-001")
            response = model.generate_content(synthesis_prompt)

            return response.text

        return {
            'search_function': hybrid_search,
            'description': 'Hybrid RAG + KI Scraper Intelligence System'
        }

Ausführen Ihres RAG-Agent-Systems

Führen Sie alle Komponenten zu einem vollständigen Workflow zusammen, der Dokumente verarbeitet, Anfragen bearbeitet und fundierte Antworten generiert. Laden Sie außerdem die PDF-Dokumente, die Sie verarbeiten möchten, herunter und legen Sie sie im Ordner „docs/“ ab, damit die KI einen Kontext zu Ihrem Produkt aufbauen kann.

def main():
    """Hauptablauf für das RAG-Agentensystem."""

    print("=" * 60)
    print("RAG-Agentensystem – Initialisierung")
    print("=" * 60)

    initialize_adk()

    corpus_id = create_rag_corpus(
        corpus_name="enterprise-knowledge-base-3",
        description="Multimodale Unternehmensdokumentation und Wissensspeicher"
    )

    retrieval_config = configure_retrieval_parameters(corpus_id)
    print(f"n✓ Verwendung der Abrufkonfiguration mit top_k={retrieval_config['similarity_top_k']}")

    print("n" + "=" * 60)
    print("Dokumenteneingabe-Pipeline")
    print("=" * 60)

    document_paths = [
        "docs/technical_manual.pdf",
        "docs/product_specs.pdf",
        "docs/user_guide.pdf"
    ]

    gcs_uris = []
    all_chunks = []
    extracted_images = []

    for doc_path in document_paths:
        if os.path.exists(doc_path):
            extracted = extract_text_from_pdf(doc_path)
            print(f"n✓ Extracted {extracted['metadata']['num_pages']} pages from {Path(doc_path).name}")

            cleaned_text = preprocess_document(extracted['full_text'])
            print(f"✓ Vorverarbeiteter Text: {len(cleaned_text)} Zeichen")

            chunks = chunk_document(cleaned_text, chunk_size=1000, overlap=200)
            all_chunks.extend(chunks)
            print(f"✓ Dokument in {len(chunks)} Segmente unterteilt")

            gcs_uri = upload_file_to_gcs(doc_path, os.getenv('GCS_BUCKET_NAME'))
            gcs_uris.append(gcs_uri)

    print(f"n✓ Insgesamt erstellte Segmente: {len(all_chunks)}")
    print(f"✓ Insgesamt extrahierte Bilder: {len(extracted_images)}")

    if gcs_uris:
        import_documents_to_corpus(corpus_id, gcs_uris)
        index_config = {"distance_measure": "COSINE", "algorithm": "TREE_AH"}
        create_vector_index(corpus_id, index_config)
        time.sleep(180)

    # ========================================================================
    # Initialisieren Sie Google ADK Agent mit Vertex KI RAG Engine.
    # ========================================================================
    print("n" + "=" * 60)
    print("Google ADK Agent Initialisierung")
    print("=" * 60)

    adk_agent = ADKRAGAgent(
        corpus_id=corpus_id,
        project_id=os.getenv("GOOGLE_CLOUD_PROJECT"),
        location=os.getenv("GOOGLE_CLOUD_LOCATION")
    )

    agent = adk_agent.create_agent()

    for doc_path in document_paths:
        if os.path.exists(doc_path):
            try:
                images = adk_agent.rag_agent.extract_images_from_pdf(doc_path, "extracted_images")
                extracted_images.extend(images)
                if images:
                    print(f"✓ {len(images)} Bilder für multimodale Verarbeitung extrahiert")
            except Exception as e:
                print(f"⚠️ Bildextraktion übersprungen: {str(e)}")

    queries = [
        "Was sind die Systemanforderungen für die Installation?",
        "Wie konfiguriere ich die Authentifizierungseinstellungen?",
        „Wie sehen die Preisstufen und ihre Funktionen aus?”
    ]

    print("n" + "=" * 60)
    print("Google ADK Agent – Abfrageverarbeitung")
    print("=" * 60)
    print("Verwendung: Google ADK + Vertex KI RAG Engine")
    print("=" * 60)

    session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

    for idx, query in enumerate(queries):
        print(f"n📝 Abfrage {idx + 1}: {query}")
        print("-" * 60)

        try:
            answer = adk_agent.chat(agent, query, session_id)

            print(f"n💬 ADK Agent Response:n{answer}n")
            print(f"✓ Conversation history: {len(adk_agent.rag_agent.conversation_history)} messages")

        except Exception as e:
            print(f"❌ Fehler: {str(e)}")
            import traceback
            traceback.print_exc()

        print("-" * 60)

        if idx < len(queries) - 1:
            time.sleep(90)

    if extracted_images:
        print("n" + "=" * 60)
        print("Multi-Modal Processing Demo")
        print("=" * 60)

        sample_table = """Feature | Basic | Pro | Enterprise
Speicherplatz | 10 GB | 100 GB | Unbegrenzt
Benutzer | 1 | 10 | Unbegrenzt
Preis | 10 $ | 50 $ | Individuell"""

        table_data = adk_agent.rag_agent.process_table_content(sample_table)
        print(f"n✓ Tabelle mit {table_data.get('row_count', 0)} Zeilen verarbeitet")

        if all_chunks and extracted_images:
            multimodal_embed = adk_agent.rag_agent.create_multimodal_embedding(
                text=all_chunks[0]['text'][:500],
                image_path=extracted_images[0]['image_path'] if extracted_images else None,
                table_data=table_data
            )
            print(f"✓ Multimodale Einbettung mit {multimodal_embed['modalities']} Modalitäten erstellt")
            print(f"  - Hat Bild: {multimodal_embed['has_image']}")
            print(f"  - Hat Tabelle: {multimodal_embed['has_table']}")

    print("n" + "=" * 60)
    print(f"Google ADK RAG Agent System – Fertiggestellt")
    print(f"✓ Architektur: Google ADK + Vertex KI RAG Engine")
    print(f"✓ Gesamtzahl der Gesprächsrunden: {len(adk_agent.rag_agent.conversation_history)}")
    print("=" * 60)

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"n❌ Fehler: {str(e)}")
        import traceback
        traceback.print_exc()

Führen Sie das RAG-Agentensystem aus:

python3 rag_agent.py

Sie sehen die Verarbeitungs-Pipeline des Agenten in der Konsole, während er:

  1. den Google ADK-Client und die KI-Verbindung initialisiert.
  2. Erstellt den RAG-Korpus mit eingebetteter Modellkonfiguration.
  3. Dokumente werden durch Extrahieren, Bereinigen und Chunking verarbeitet.
  4. Dateien in Cloud Storage hochlädt und in den Korpus importiert.
  5. Vektor-Einbettungen generiert und den Suchindex erstellt.
  6. Führt Abfragen mit Erweiterung, Abruf und Neuanordnung durch.
  7. Erstellt fundierte Antworten mit Zitaten und Verifizierungen.
  8. Bewertet die Qualität der Antworten anhand von Relevanz, Vollständigkeit, Genauigkeit und Klarheit.

Die Konsolenausgabe zeigt den detaillierten Fortschritt für jeden Schritt an.
Runing the RAG infrastructure for generative AI

Abschließende Gedanken

Sie verfügen nun über ein produktionsreifes RAG-Agentensystem, das das Agent Development Kit von Google mit Vertex KI kombiniert. Das System nimmt Dokumente auf, ruft relevante Kontexte über eine hybride Suche ab und generiert präzise Antworten mit Zitaten.

Verbessern Sie es, indem Sie die Chunking-Strategien optimieren, Feedback-Schleifen hinzufügen, zusätzliche Datenquellen integrieren oder Echtzeitüberwachung aktivieren. Das modulare Design ermöglicht eine einfache Anpassung.

Entdecken Sie erweiterte KI-Workflows und die KI-Infrastruktur von Bright Data für weitere Funktionen.

Erstellen Sie ein kostenloses Konto, um mit dem Aufbau zu beginnen.