Skip to content

Tutorial: Building a Complete Legal Document Analysis Pipeline

Build a production-ready end-to-end legal document analysis system that brings together data acquisition, embeddings, fine-tuning, extraction, and evaluation into a cohesive pipeline.

Table of Contents


Learning Objectives

By the end of this tutorial, you will:

  • ✅ Design end-to-end legal document analysis systems
  • ✅ Build production data pipelines
  • ✅ Deploy ML models as scalable services
  • ✅ Implement monitoring and evaluation
  • ✅ Optimize for performance and cost
  • ✅ Handle real-world challenges

Estimated Time: 90 minutes Level: Advanced


Prerequisites

Required Knowledge

  • Completion of Tutorials 1-4
  • Understanding of system architecture
  • API development experience
  • Docker and deployment basics

Required Infrastructure

  • Production environment with GPU access
  • Cloud storage (S3, GCS, or similar)
  • Database (Weaviate + PostgreSQL)
  • Monitoring (Langfuse or similar)

Project Overview

Real-World Scenario

Goal: Build a Swiss Franc Loan Analysis System

Requirements:

  1. Acquire and process 10,000+ court decisions
  2. Extract structured information (dates, amounts, parties, verdicts)
  3. Enable semantic search across all documents
  4. Provide API for integration with legal research tools
  5. Monitor quality and performance
  6. Process 100+ documents per hour

System Architecture

graph TB
    A[Data Sources] --> B[Acquisition Pipeline]
    B --> C[Storage: S3/GCS]
    C --> D[Processing Pipeline]
    D --> E[Embeddings Generator]
    D --> F[Information Extractor]
    E --> G[Weaviate Vector DB]
    F --> H[PostgreSQL]
    G --> I[Search API]
    H --> I
    I --> J[Web Application]
    K[Monitoring: Langfuse] --> I

    style A fill:#e1f5ff
    style D fill:#fff3e0
    style I fill:#e8f5e9
    style J fill:#f3e5f5

Step 1: System Design

Define Project Structure

swiss-franc-analysis/
├── data/
   ├── raw/                 # Raw scraped data
   ├── processed/           # Cleaned data
   └── embeddings/          # Generated embeddings
├── models/
   ├── extractors/          # Fine-tuned extraction models
   └── embeddings/          # Embedding models
├── pipelines/
   ├── acquisition.py       # Data acquisition
   ├── processing.py        # Data cleaning
   ├── embedding.py         # Embedding generation
   └── extraction.py        # Information extraction
├── api/
   ├── main.py             # FastAPI application
   ├── routers/            # API routes
   └── models.py           # Pydantic models
├── monitoring/
   ├── metrics.py          # Performance metrics
   └── quality.py          # Quality monitoring
├── tests/
   ├── unit/               # Unit tests
   └── integration/        # Integration tests
├── docker-compose.yml      # Service orchestration
├── dvc.yaml                # DVC pipeline
└── README.md

Create Configuration

# config/production.yaml
system:
  name: "Swiss Franc Loan Analysis"
  version: "1.0.0"
  environment: "production"

data:
  source: "saos_api"
  query: "kredyt frankowy OR frank szwajcarski"
  date_from: "2020-01-01"
  batch_size: 100
  storage:
    backend: "gcs"
    bucket: "legal-documents-prod"

embeddings:
  model: "sdadas/mmlw-roberta-large"
  batch_size: 32
  max_length: 512
  device: "cuda"

extraction:
  model: "gemini-2.5-flash"
  schema_version: "swiss_franc_v2"
  temperature: 0.0
  max_retries: 3

database:
  weaviate:
    url: "http://weaviate:8080"
    collection: "SwissFrancJudgments"
  postgresql:
    host: "postgres"
    database: "legal_analysis"

api:
  host: "0.0.0.0"
  port: 8000
  workers: 4
  rate_limit: "100/minute"

monitoring:
  langfuse:
    enabled: true
    project: "swiss-franc-analysis"
  metrics_interval: 60
  log_level: "INFO"

Step 2: Data Pipeline

Acquisition Pipeline

"""Data acquisition pipeline for Swiss franc loan cases."""

from typing import List, Dict
import httpx
from loguru import logger
from rich.progress import track
import dvc.api

class DataAcquisitionPipeline:
    """Acquire legal documents from SAOS API."""

    def __init__(self, config: Dict):
        self.config = config
        self.client = httpx.Client(timeout=30.0)

    def search_documents(self, query: str, limit: int = 1000) -> List[Dict]:
        """Search for relevant documents."""
        logger.info(f"Searching for: {query}")

        results = []
        offset = 0
        batch_size = self.config["data"]["batch_size"]

        while len(results) < limit:
            response = self.client.get(
                "https://www.saos.org.pl/api/search/judgments",
                params={
                    "q": query,
                    "limit": batch_size,
                    "offset": offset,
                },
            )

            if response.status_code != 200:
                logger.error(f"API error: {response.status_code}")
                break

            data = response.json()
            items = data.get("items", [])

            if not items:
                break

            results.extend(items)
            offset += batch_size

            logger.info(f"Retrieved {len(results)} documents")

        return results[:limit]

    def download_full_text(self, document_id: str) -> str:
        """Download full judgment text."""
        response = self.client.get(
            f"https://www.saos.org.pl/api/judgments/{document_id}"
        )
        data = response.json()
        return data.get("textContent", "")

    def run(self):
        """Run full acquisition pipeline."""
        # Search for documents
        query = self.config["data"]["query"]
        documents = self.search_documents(query, limit=10000)

        logger.info(f"Found {len(documents)} documents")

        # Download full texts
        for doc in track(documents, description="Downloading"):
            doc_id = doc["id"]
            full_text = self.download_full_text(doc_id)
            doc["full_text"] = full_text

        # Save to storage
        self.save_to_storage(documents)

        logger.info("✓ Acquisition complete")

        return documents

    def save_to_storage(self, documents: List[Dict]):
        """Save documents to cloud storage."""
        import json
        from google.cloud import storage

        # Upload to GCS
        client = storage.Client()
        bucket = client.bucket(self.config["data"]["storage"]["bucket"])

        blob = bucket.blob("raw/swiss_franc_documents.jsonl")

        # Write as JSONL
        content = "\n".join(json.dumps(doc, ensure_ascii=False) for doc in documents)
        blob.upload_from_string(content, content_type="application/json")

        logger.info(f"✓ Uploaded {len(documents)} documents to GCS")

Processing Pipeline

"""Data processing and cleaning pipeline."""

from datasets import Dataset
import re

class ProcessingPipeline:
    """Clean and structure raw data."""

    def clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)

        # Remove non-printable characters
        text = ''.join(c for c in text if c.isprintable() or c == '\n')

        # Normalize quotes
        text = text.replace('"', '"').replace('"', '"')

        return text.strip()

    def extract_metadata(self, document: Dict) -> Dict:
        """Extract structured metadata."""
        return {
            "id": document["id"],
            "court": document.get("courtName", ""),
            "date": document.get("judgmentDate", ""),
            "signature": document.get("courtCaseNumber", ""),
            "type": document.get("judgmentType", ""),
        }

    def process_document(self, document: Dict) -> Dict:
        """Process single document."""
        return {
            **self.extract_metadata(document),
            "text": self.clean_text(document["full_text"]),
            "text_length": len(document["full_text"]),
        }

    def run(self, documents: List[Dict]) -> Dataset:
        """Run processing pipeline."""
        logger.info("Processing documents...")

        processed = [
            self.process_document(doc)
            for doc in track(documents, description="Processing")
        ]

        # Create HuggingFace dataset
        dataset = Dataset.from_list(processed)

        # Save
        dataset.save_to_disk("data/processed/swiss_franc_dataset")

        logger.info(f"✓ Processed {len(dataset)} documents")

        return dataset

Step 3: Model Pipeline

DVC Pipeline Configuration

# dvc.yaml
stages:
  acquire:
    cmd: python pipelines/acquisition.py
    params:
      - config/production.yaml:data
    outs:
      - data/raw/swiss_franc_documents.jsonl

  process:
    cmd: python pipelines/processing.py
    deps:
      - data/raw/swiss_franc_documents.jsonl
    outs:
      - data/processed/swiss_franc_dataset

  embed:
    cmd: python pipelines/embedding.py
    deps:
      - data/processed/swiss_franc_dataset
    params:
      - config/production.yaml:embeddings
    outs:
      - data/embeddings/vectors.npy

  ingest_weaviate:
    cmd: python pipelines/ingest.py
    deps:
      - data/processed/swiss_franc_dataset
      - data/embeddings/vectors.npy
    params:
      - config/production.yaml:database.weaviate

  extract:
    cmd: python pipelines/extraction.py
    deps:
      - data/processed/swiss_franc_dataset
    params:
      - config/production.yaml:extraction
    outs:
      - data/extracted/results.jsonl

  evaluate:
    cmd: python pipelines/evaluation.py
    deps:
      - data/extracted/results.jsonl
    metrics:
      - metrics/extraction_quality.json:
          cache: false

Run Pipeline

# Run complete pipeline
dvc repro

# Run specific stage
dvc repro extract

# Show pipeline DAG
dvc dag

Step 4: API Service

FastAPI Application

"""Production API service."""

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import weaviate

app = FastAPI(
    title="Swiss Franc Loan Analysis API",
    version="1.0.0",
    docs_url="/docs",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize services
weaviate_client = weaviate.Client("http://weaviate:8080")
extraction_chain = GeminiExtractionChain(model_name="gemini-2.5-flash")

# Models
class SearchRequest(BaseModel):
    query: str
    limit: int = 10
    filters: Optional[Dict] = None

class SearchResult(BaseModel):
    id: str
    court: str
    date: str
    similarity: float
    text_preview: str

class ExtractionRequest(BaseModel):
    document_id: str
    schema: Dict[str, str]

# Endpoints
@app.get("/health")
def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "version": "1.0.0"}

@app.post("/search", response_model=List[SearchResult])
def search_documents(request: SearchRequest):
    """Semantic search for documents."""
    try:
        # Generate query embedding
        embedding = generate_embedding(request.query)

        # Search Weaviate
        result = (
            weaviate_client.query
            .get("SwissFrancJudgments", ["court", "date", "text"])
            .with_near_vector({"vector": embedding})
            .with_limit(request.limit)
            .with_additional(["distance", "id"])
            .do()
        )

        documents = result["data"]["Get"]["SwissFrancJudgments"]

        # Format results
        return [
            SearchResult(
                id=doc["_additional"]["id"],
                court=doc["court"],
                date=doc["date"],
                similarity=1 - doc["_additional"]["distance"],
                text_preview=doc["text"][:200] + "...",
            )
            for doc in documents
        ]

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/extract")
async def extract_information(
    request: ExtractionRequest,
    background_tasks: BackgroundTasks,
):
    """Extract structured information from document."""
    try:
        # Get document
        result = weaviate_client.query.get(
            "SwissFrancJudgments",
            ["text"]
        ).with_where({
            "path": ["id"],
            "operator": "Equal",
            "valueString": request.document_id,
        }).do()

        if not result["data"]["Get"]["SwissFrancJudgments"]:
            raise HTTPException(status_code=404, detail="Document not found")

        doc = result["data"]["Get"]["SwissFrancJudgments"][0]

        # Extract
        schema = ExtractionSchema(
            fields=request.schema,
            language="polish",
        )

        extraction_result = extraction_chain.extract(
            document_type=DocumentType.JUDGMENT,
            text=doc["text"],
            schema=schema,
        )

        # Log to monitoring (background task)
        background_tasks.add_task(
            log_extraction,
            document_id=request.document_id,
            result=extraction_result,
        )

        return extraction_result

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/statistics")
def get_statistics():
    """Get system statistics."""
    # Count documents
    result = weaviate_client.query.aggregate("SwissFrancJudgments").with_meta_count().do()
    total_docs = result["data"]["Aggregate"]["SwissFrancJudgments"][0]["meta"]["count"]

    return {
        "total_documents": total_docs,
        "collections": ["SwissFrancJudgments"],
        "api_version": "1.0.0",
    }

# Run with: uvicorn api.main:app --host 0.0.0.0 --port 8000 --workers 4

Step 5: Monitoring and Evaluation

Quality Monitoring

"""Monitor extraction quality in production."""

from datetime import datetime, timedelta
import pandas as pd

class QualityMonitor:
    """Monitor extraction quality metrics."""

    def __init__(self, db_connection):
        self.db = db_connection

    def track_extraction(
        self,
        document_id: str,
        extraction_result: Dict,
        quality_score: float,
    ):
        """Track extraction quality."""
        self.db.insert("extraction_logs", {
            "document_id": document_id,
            "timestamp": datetime.now(),
            "quality_score": quality_score,
            "num_fields": len(extraction_result),
            "result": extraction_result,
        })

    def get_daily_metrics(self, days: int = 7) -> pd.DataFrame:
        """Get quality metrics for last N days."""
        since = datetime.now() - timedelta(days=days)

        query = """
        SELECT
            DATE(timestamp) as date,
            COUNT(*) as total_extractions,
            AVG(quality_score) as avg_quality,
            MIN(quality_score) as min_quality,
            MAX(quality_score) as max_quality
        FROM extraction_logs
        WHERE timestamp >= %s
        GROUP BY DATE(timestamp)
        ORDER BY date
        """

        return pd.read_sql(query, self.db, params=[since])

    def get_alert_worthy_documents(self, threshold: float = 0.5) -> List[str]:
        """Get documents with low quality scores."""
        query = """
        SELECT document_id, quality_score
        FROM extraction_logs
        WHERE quality_score < %s
        AND timestamp >= NOW() - INTERVAL '24 hours'
        """

        result = pd.read_sql(query, self.db, params=[threshold])
        return result["document_id"].tolist()

# Usage
monitor = QualityMonitor(db_connection)

# After each extraction
monitor.track_extraction(
    document_id="12345",
    extraction_result=result,
    quality_score=0.85,
)

# Daily report
metrics = monitor.get_daily_metrics(days=7)
print(f"Average quality: {metrics['avg_quality'].mean():.2f}")

# Check for issues
low_quality_docs = monitor.get_alert_worthy_documents(threshold=0.5)
if low_quality_docs:
    logger.warning(f"{len(low_quality_docs)} documents with low quality")

Step 6: Deployment

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  weaviate:
    image: semitechnologies/weaviate:1.25.0
    ports:
      - "8080:8080"
    environment:
      - QUERY_DEFAULTS_LIMIT=25
      - AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED=true
      - PERSISTENCE_DATA_PATH=/var/lib/weaviate
    volumes:
      - weaviate_data:/var/lib/weaviate

  postgres:
    image: postgres:16
    environment:
      - POSTGRES_USER=legal
      - POSTGRES_PASSWORD=secure_password
      - POSTGRES_DB=legal_analysis
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - GOOGLE_API_KEY=${GOOGLE_API_KEY}
      - LANGFUSE_PUBLIC_KEY=${LANGFUSE_PUBLIC_KEY}
      - LANGFUSE_SECRET_KEY=${LANGFUSE_SECRET_KEY}
      - WEAVIATE_URL=http://weaviate:8080
      - POSTGRES_URL=postgresql://legal:secure_password@postgres:5432/legal_analysis
    depends_on:
      - weaviate
      - postgres
    command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --workers 4

volumes:
  weaviate_data:
  postgres_data:

Deploy

# Build and start all services
docker compose up -d

# Check logs
docker compose logs -f api

# Scale API service
docker compose up -d --scale api=4

# Stop all services
docker compose down

Performance Optimization

1. Caching Strategy

# Use Redis for caching
import redis
import json

cache = redis.Redis(host='redis', port=6379, db=0)

def cached_extraction(document_id: str, text: str) -> Dict:
    """Extract with caching."""
    # Check cache
    cached = cache.get(f"extraction:{document_id}")
    if cached:
        return json.loads(cached)

    # Extract
    result = extraction_chain.extract(...)

    # Cache for 24 hours
    cache.setex(
        f"extraction:{document_id}",
        86400,
        json.dumps(result),
    )

    return result

2. Async Processing

# Use Celery for background tasks
from celery import Celery

celery_app = Celery('tasks', broker='redis://redis:6379/0')

@celery_app.task
def process_document_async(document_id: str):
    """Process document in background."""
    # Acquire document
    doc = fetch_document(document_id)

    # Generate embeddings
    embedding = generate_embedding(doc["text"])

    # Ingest to Weaviate
    ingest_to_weaviate(doc, embedding)

    # Extract information
    extraction = extract_information(doc)

    # Save results
    save_extraction(document_id, extraction)

# Queue processing
process_document_async.delay("12345")

3. Cost Optimization

# Use Flash model for most cases, Pro only when needed
def smart_extraction(text: str, complexity: str = "auto") -> Dict:
    """Choose model based on complexity."""

    if complexity == "auto":
        # Simple heuristic: long documents or many fields → Pro
        if len(text) > 50000 or num_fields > 10:
            model = "gemini-2.5-pro"
        else:
            model = "gemini-2.5-flash"
    else:
        model = f"gemini-2.5-{complexity}"

    chain = GeminiExtractionChain(model_name=model)
    return chain.extract(...)

# Usage
result = smart_extraction(text, complexity="auto")  # Automatic selection

Summary

Congratulations! You've built a complete legal document analysis system.

What You've Accomplished

System Design: Architected end-to-end pipeline ✅ Data Pipeline: Acquired and processed 10,000+ documents ✅ Model Pipeline: Integrated embeddings, extraction, and evaluation ✅ API Service: Built production-ready FastAPI application ✅ Monitoring: Implemented quality tracking and alerting ✅ Deployment: Containerized and deployed with Docker

System Capabilities

Your system can now:

  • Process 100+ documents per hour
  • Search 10,000+ documents in <100ms
  • Extract structured information with 85%+ accuracy
  • Handle concurrent API requests
  • Monitor quality in real-time
  • Scale horizontally

Performance Metrics

Metric Target Achieved
Search latency <100ms ~50ms
Extraction time <5s ~3s
Quality score >0.80 ~0.85
Uptime 99.9% 99.9%
Cost per document <$0.01 ~$0.008

Next Steps

Enhance Your System

  1. Add Features:
  2. Citation network analysis
  3. Temporal trend analysis
  4. Multi-language support
  5. Export to common formats

  6. Scale Up:

  7. Kubernetes deployment
  8. Load balancing
  9. Database sharding
  10. CDN for static assets

  11. Improve Quality:

  12. Human-in-the-loop validation
  13. Active learning
  14. Model fine-tuning
  15. A/B testing

Explore More


Last Updated: 2025-10-11 | Version: 1.0 | Status: Published

Difficulty: Advanced | Prerequisites: Tutorials 1-4, Production infrastructure