Skip to content

Gemini Extraction Chain

LangChain-based information extraction using Google Gemini 2.5 Pro/Flash with caching and observability.

Overview

The juddges.extraction.gemini_chain module provides a production-ready extraction pipeline using Google's Gemini 2.5 models. It's designed for structured information extraction from legal documents with:

  • Caching to reduce API costs
  • Langfuse integration for observability
  • Schema-driven extraction
  • Batch processing support
  • Automatic error handling

Key Features

  • Multiple Models: Gemini 2.5 Pro and Flash support
  • SQLite Caching: Avoid redundant API calls (cost savings)
  • Langfuse Observability: Track extraction runs, costs, and performance
  • Structured Output: Parse JSON responses to dictionaries
  • Document Type Aware: Optimized prompts for judgments vs tax interpretations
  • Batch Extraction: Process multiple documents efficiently
  • Automatic Truncation: Handle documents exceeding token limits

Usage Examples

Basic Extraction

from juddges.extraction.gemini_chain import (
    GeminiExtractionChain,
    ExtractionSchema,
    DocumentType
)

# Initialize chain
chain = GeminiExtractionChain(
    model_name="gemini-2.5-flash",
    temperature=0.0,
    cache_path="cache/extraction.db"
)

# Define extraction schema
schema = ExtractionSchema(
    fields={
        "verdict_date": "date as ISO 8601, when the verdict was issued",
        "court": "string, name of the court",
        "case_number": "string, case identifier",
        "parties": "List[string], names of involved parties"
    },
    instructions="Focus on extracting factual information only.",
    language="polish"
)

# Extract from judgment
result = chain.extract(
    document_type=DocumentType.JUDGMENT,
    text="Sąd Okręgowy w Warszawie dnia 15 stycznia 2024...",
    schema=schema
)

print(result)
# {
#     "verdict_date": "2024-01-15",
#     "court": "Sąd Okręgowy w Warszawie",
#     "case_number": "...",
#     "parties": ["Jan Kowalski", "XYZ Bank"]
# }

Extraction with Langfuse Observability

from langfuse.callback import CallbackHandler

# Initialize Langfuse handler
langfuse_handler = CallbackHandler(
    public_key="pk-...",
    secret_key="sk-...",
    host="https://cloud.langfuse.com"
)

# Extract with tracing
result = chain.extract(
    document_type=DocumentType.JUDGMENT,
    text=judgment_text,
    schema=schema,
    langfuse_handler=langfuse_handler
)

# View trace in Langfuse dashboard

Batch Extraction

# Extract from multiple documents
texts = [judgment1, judgment2, judgment3]

results = chain.batch_extract(
    document_type=DocumentType.JUDGMENT,
    texts=texts,
    schema=schema,
    langfuse_handler=langfuse_handler
)

# Process results
for i, result in enumerate(results):
    print(f"Document {i}: {result}")

Tax Interpretation Extraction

# Different document type with specialized prompt
schema = ExtractionSchema(
    fields={
        "interpretation_date": "date as ISO 8601",
        "tax_authority": "string, issuing tax authority",
        "taxpayer": "string, name of taxpayer",
        "interpretation_subject": "string, subject of interpretation"
    },
    language="polish"
)

result = chain.extract(
    document_type=DocumentType.TAX_INTERPRETATION,
    text=tax_interpretation_text,
    schema=schema
)

API Reference

GeminiExtractionChain

GeminiExtractionChain(model_name: Literal['gemini-2.5-pro', 'gemini-2.5-flash', 'gemini-2.0-flash-exp', 'gemini-1.5-pro', 'gemini-1.5-flash'] = 'gemini-2.5-pro', project: Optional[str] = None, location: str = 'us-central1', temperature: float = 0.0, cache_path: Optional[str | Path] = None, max_output_tokens: Optional[int] = 8192, enable_thinking: bool = False)

LangChain extraction chain using Gemini 2.5 Pro with guaranteed valid JSON output.

Features: - Google Gemini 2.5 Pro/Flash model support - Native structured output via with_structured_output() - guarantees valid JSON responses - Eliminates JSON parsing errors by using Gemini's response_schema API - Optional extended thinking mode for Gemini 2.5 models (disabled by default) - PostgreSQL caching (via POSTGRES_CACHE_URL env var) with SQLite fallback - Langfuse callback integration for observability - Document type-aware prompting - Dynamic Pydantic model generation from ExtractionSchema

Thinking Mode (Gemini 2.5 only): - Extended thinking mode shows the model's reasoning process before providing the answer - Can improve accuracy for complex reasoning tasks - Increases latency and token usage - Default: disabled (enable_thinking=False) - Recommended: keep disabled for structured extraction tasks, enable for complex reasoning

Cache Configuration: - Set POSTGRES_CACHE_URL environment variable for PostgreSQL caching - Falls back to SQLite if PostgreSQL is unavailable or not configured - Custom SQLite path can be specified via cache_path parameter

Example (default - no thinking): >>> chain = GeminiExtractionChain( ... model_name="gemini-2.5-pro", ... cache_path="cache/extraction.db", # SQLite fallback path ... temperature=0.0, ... ) >>> >>> schema = ExtractionSchema( ... fields={ ... "verdict_date": "date as ISO 8601", ... "court": "string, name of the court", ... }, ... language="polish", ... ) >>> >>> result = chain.extract( ... document_type=DocumentType.JUDGMENT, ... text="Sąd Najwyższy orzekł dnia 2024-01-15...", ... schema=schema, ... langfuse_handler=my_langfuse_handler, # Optional ... ) >>> print(result) # {"verdict_date": "2024-01-15", "court": "Sąd Najwyższy"}

Example (with thinking enabled): >>> chain = GeminiExtractionChain( ... model_name="gemini-2.5-pro", ... enable_thinking=True, # Enable extended thinking mode ... ) >>> # Model will show reasoning process in responses

PARAMETER DESCRIPTION
model_name

Gemini model to use (via Vertex AI)

TYPE: Literal['gemini-2.5-pro', 'gemini-2.5-flash', 'gemini-2.0-flash-exp', 'gemini-1.5-pro', 'gemini-1.5-flash'] DEFAULT: 'gemini-2.5-pro'

project

GCP project ID (defaults to VERTEX_PROJECT or gcloud default)

TYPE: Optional[str] DEFAULT: None

location

GCP region (default: us-central1)

TYPE: str DEFAULT: 'us-central1'

temperature

Sampling temperature (0.0 for deterministic)

TYPE: float DEFAULT: 0.0

cache_path

Path to SQLite cache file (used as fallback if PostgreSQL unavailable)

TYPE: Optional[str | Path] DEFAULT: None

max_output_tokens

Maximum tokens in response

TYPE: Optional[int] DEFAULT: 8192

enable_thinking

Enable extended thinking mode for Gemini 2.5 models (default: False). When enabled, the model shows its reasoning process before answering. This can improve accuracy for complex tasks but increases latency and token usage. Recommended for complex reasoning tasks, not for simple structured extraction.

TYPE: bool DEFAULT: False

Environment Variables

POSTGRES_CACHE_URL: PostgreSQL connection string for LLM caching (preferred) VERTEX_PROJECT: GCP project ID for Vertex AI GOOGLE_CLOUD_PROJECT: Alternative GCP project ID

Source code in juddges/extraction/gemini_chain.py
def __init__(
    self,
    model_name: Literal[
        "gemini-2.5-pro",
        "gemini-2.5-flash",
        "gemini-2.0-flash-exp",
        "gemini-1.5-pro",
        "gemini-1.5-flash",
    ] = "gemini-2.5-pro",
    project: Optional[str] = None,
    location: str = "us-central1",
    temperature: float = 0.0,
    cache_path: Optional[str | Path] = None,
    max_output_tokens: Optional[int] = 8192,
    enable_thinking: bool = False,
):
    """Initialize Gemini extraction chain using Vertex AI.

    Args:
        model_name: Gemini model to use (via Vertex AI)
        project: GCP project ID (defaults to VERTEX_PROJECT or gcloud default)
        location: GCP region (default: us-central1)
        temperature: Sampling temperature (0.0 for deterministic)
        cache_path: Path to SQLite cache file (used as fallback if PostgreSQL unavailable)
        max_output_tokens: Maximum tokens in response
        enable_thinking: Enable extended thinking mode for Gemini 2.5 models (default: False).
                       When enabled, the model shows its reasoning process before answering.
                       This can improve accuracy for complex tasks but increases latency and token usage.
                       Recommended for complex reasoning tasks, not for simple structured extraction.

    Environment Variables:
        POSTGRES_CACHE_URL: PostgreSQL connection string for LLM caching (preferred)
        VERTEX_PROJECT: GCP project ID for Vertex AI
        GOOGLE_CLOUD_PROJECT: Alternative GCP project ID
    """
    import os

    self.model_name = model_name
    self.temperature = temperature
    self.max_output_tokens = max_output_tokens
    self.enable_thinking = enable_thinking

    # Get project from env or parameter
    self.project = project or os.getenv("VERTEX_PROJECT") or os.getenv("GOOGLE_CLOUD_PROJECT")
    if not self.project:
        logger.warning("No GCP project specified, will use gcloud default")

    self.location = location

    # Set up caching - prefer PostgreSQL via SQLAlchemy with MD5, fallback to SQLite
    postgres_url = os.getenv("POSTGRES_CACHE_URL")

    if postgres_url:
        try:
            from sqlalchemy import create_engine

            # Create PostgreSQL cache using SQLAlchemyMd5Cache (avoids 8KB index size limit)
            engine = create_engine(postgres_url)
            langchain.llm_cache = SQLAlchemyMd5Cache(engine)
            # Extract host/port from URL for logging (format: postgresql://user:pass@host:port/db)
            db_location = postgres_url.split("@")[1] if "@" in postgres_url else "configured"
            logger.info(f"Enabled LangChain PostgreSQL cache (SQLAlchemy MD5): {db_location}")
        except Exception as e:
            logger.warning(
                f"Failed to initialize PostgreSQL cache: {e}, falling back to SQLite"
            )
            # Fallback to SQLite
            default_cache = Path(".cache/langchain.db")
            default_cache.parent.mkdir(parents=True, exist_ok=True)
            langchain.llm_cache = SQLiteCache(database_path=str(default_cache))
            logger.info(f"Enabled LangChain SQLite cache (fallback): {default_cache}")
    elif cache_path:
        # Use custom SQLite path if provided
        cache_file = Path(cache_path)
        cache_file.parent.mkdir(parents=True, exist_ok=True)
        langchain.llm_cache = SQLiteCache(database_path=str(cache_file))
        logger.info(f"Enabled LangChain SQLite cache: {cache_file}")
    else:
        # Default SQLite cache
        default_cache = Path(".cache/langchain.db")
        default_cache.parent.mkdir(parents=True, exist_ok=True)
        langchain.llm_cache = SQLiteCache(database_path=str(default_cache))
        logger.info(f"Enabled LangChain SQLite cache: {default_cache}")

    # Initialize Vertex AI Gemini model (uses application default credentials)
    # Configure model kwargs for thinking mode if enabled
    model_kwargs = {}
    if self.enable_thinking and "2.5" in model_name:
        # Gemini 2.5 models support extended thinking mode
        # This enables the model to show its reasoning process
        model_kwargs["thinking"] = True
        logger.info(f"Extended thinking mode enabled for {model_name}")
    elif self.enable_thinking:
        logger.warning(
            f"Thinking mode requested but not supported for {model_name}. "
            "Only Gemini 2.5 models support extended thinking."
        )

    # Build kwargs for ChatVertexAI - only include model_kwargs if not empty
    llm_kwargs = {
        "model": model_name,
        "project": self.project,
        "location": self.location,
        "temperature": temperature,
        "max_tokens": max_output_tokens,
    }

    # Only add model_kwargs if there are any (empty dict causes issues with LangChain)
    if model_kwargs:
        llm_kwargs["model_kwargs"] = model_kwargs

    self.llm = ChatVertexAI(**llm_kwargs)

    logger.info(
        f"Initialized VertexAI GeminiExtractionChain with {model_name} "
        f"(project: {self.project}, location: {self.location}, "
        f"thinking: {self.enable_thinking})"
    )

Functions

extract

extract(document_type: DocumentType, text: str, schema: ExtractionSchema, langfuse_handler: Optional[BaseCallbackHandler] = None, max_text_length: int = 150000) -> dict[str, Any]

Extract structured information from document text.

PARAMETER DESCRIPTION
document_type

Type of document (judgment or tax interpretation)

TYPE: DocumentType

text

Full text of the document

TYPE: str

schema

Extraction schema defining fields and instructions

TYPE: ExtractionSchema

langfuse_handler

Optional Langfuse callback handler for observability

TYPE: Optional[BaseCallbackHandler] DEFAULT: None

max_text_length

Maximum text length to process (truncates if longer)

TYPE: int DEFAULT: 150000

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with extracted information matching schema fields

Example

schema = ExtractionSchema( ... fields={ ... "verdict_date": "date as ISO 8601", ... "court": "string, court name", ... "case_number": "string, case identifier", ... }, ... language="polish", ... ) result = chain.extract( ... document_type=DocumentType.JUDGMENT, ... text="Sąd Okręgowy w Warszawie...", ... schema=schema, ... )

Source code in juddges/extraction/gemini_chain.py
def extract(
    self,
    document_type: DocumentType,
    text: str,
    schema: ExtractionSchema,
    langfuse_handler: Optional[BaseCallbackHandler] = None,
    max_text_length: int = 150000,
) -> dict[str, Any]:
    """Extract structured information from document text.

    Args:
        document_type: Type of document (judgment or tax interpretation)
        text: Full text of the document
        schema: Extraction schema defining fields and instructions
        langfuse_handler: Optional Langfuse callback handler for observability
        max_text_length: Maximum text length to process (truncates if longer)

    Returns:
        Dictionary with extracted information matching schema fields

    Example:
        >>> schema = ExtractionSchema(
        ...     fields={
        ...         "verdict_date": "date as ISO 8601",
        ...         "court": "string, court name",
        ...         "case_number": "string, case identifier",
        ...     },
        ...     language="polish",
        ... )
        >>> result = chain.extract(
        ...     document_type=DocumentType.JUDGMENT,
        ...     text="Sąd Okręgowy w Warszawie...",
        ...     schema=schema,
        ... )
    """
    # Truncate text if too long
    if len(text) > max_text_length:
        logger.warning(f"Text length {len(text)} exceeds max {max_text_length}, truncating")
        text = text[:max_text_length]

    # Build chain for this document type with structured output
    chain = self._build_chain(document_type, schema)

    # Prepare input
    chain_input = {
        "text": text,
        "schema": schema.to_schema_string(),
        "language": schema.language,
        "additional_instructions": (
            f"\nAdditional instructions:\n{schema.instructions}" if schema.instructions else ""
        ),
    }

    # Execute with optional Langfuse callback
    config = {}
    if langfuse_handler:
        config["callbacks"] = [langfuse_handler]
        logger.debug("Executing extraction with Langfuse tracing (structured output mode)")

    try:
        result = chain.invoke(chain_input, config=config)

        # Check if result is None (API returned nothing)
        if result is None:
            error_msg = "API returned None - likely rate limit, timeout, or API error"
            logger.error(f"Extraction failed: {error_msg}")
            raise ValueError(error_msg)

        logger.info(
            f"Successfully extracted {len(result)} fields from {document_type.value} using structured output"
        )
        return result
    except Exception as e:
        # Enhanced error logging with exception details
        error_type = type(e).__name__
        error_details = {
            "error_type": error_type,
            "error_message": str(e),
            "document_type": document_type.value,
            "text_length": len(text),
        }

        # Check for specific API errors
        if hasattr(e, 'code'):
            error_details["http_code"] = e.code
        if hasattr(e, 'status_code'):
            error_details["status_code"] = e.status_code

        # Log detailed error
        logger.error(
            f"Extraction failed: {error_type} - {str(e)} | "
            f"Details: {error_details}"
        )
        raise

batch_extract

batch_extract(document_type: DocumentType, texts: list[str], schema: ExtractionSchema, langfuse_handler: Optional[BaseCallbackHandler] = None, max_text_length: int = 150000) -> list[dict[str, Any]]

Extract information from multiple documents in batch.

PARAMETER DESCRIPTION
document_type

Type of documents

TYPE: DocumentType

texts

List of document texts

TYPE: list[str]

schema

Extraction schema

TYPE: ExtractionSchema

langfuse_handler

Optional Langfuse callback handler

TYPE: Optional[BaseCallbackHandler] DEFAULT: None

max_text_length

Maximum text length per document

TYPE: int DEFAULT: 150000

RETURNS DESCRIPTION
list[dict[str, Any]]

List of extraction results as dictionaries

Source code in juddges/extraction/gemini_chain.py
def batch_extract(
    self,
    document_type: DocumentType,
    texts: list[str],
    schema: ExtractionSchema,
    langfuse_handler: Optional[BaseCallbackHandler] = None,
    max_text_length: int = 150000,
) -> list[dict[str, Any]]:
    """Extract information from multiple documents in batch.

    Args:
        document_type: Type of documents
        texts: List of document texts
        schema: Extraction schema
        langfuse_handler: Optional Langfuse callback handler
        max_text_length: Maximum text length per document

    Returns:
        List of extraction results as dictionaries
    """
    # Build chain with structured output
    chain = self._build_chain(document_type, schema)

    # Prepare batch inputs
    batch_inputs = [
        {
            "text": text[:max_text_length],
            "schema": schema.to_schema_string(),
            "language": schema.language,
            "additional_instructions": (
                f"\nAdditional instructions:\n{schema.instructions}"
                if schema.instructions
                else ""
            ),
        }
        for text in texts
    ]

    # Execute batch
    config = {}
    if langfuse_handler:
        config["callbacks"] = [langfuse_handler]

    try:
        results = chain.batch(batch_inputs, config=config)

        # Check for None results in batch
        if results is None or None in results:
            none_count = results.count(None) if results else len(texts)
            error_msg = f"API returned None for {none_count}/{len(texts)} documents - likely rate limit or API error"
            logger.error(f"Batch extraction failed: {error_msg}")
            raise ValueError(error_msg)

        logger.info(
            f"Successfully extracted from {len(results)} {document_type.value} documents using structured output"
        )
        return results
    except Exception as e:
        # Enhanced error logging
        error_type = type(e).__name__
        error_details = {
            "error_type": error_type,
            "error_message": str(e),
            "document_type": document_type.value,
            "batch_size": len(texts),
        }

        # Check for specific API errors
        if hasattr(e, 'code'):
            error_details["http_code"] = e.code
        if hasattr(e, 'status_code'):
            error_details["status_code"] = e.status_code
        if "429" in str(e):
            error_details["likely_cause"] = "Rate limit exceeded"
        elif "500" in str(e) or "503" in str(e):
            error_details["likely_cause"] = "API server error"
        elif "timeout" in str(e).lower():
            error_details["likely_cause"] = "Request timeout"

        logger.error(
            f"Batch extraction failed: {error_type} - {str(e)} | "
            f"Details: {error_details}"
        )
        raise

ExtractionSchema

Bases: BaseModel

Schema definition for information extraction.

ATTRIBUTE DESCRIPTION
fields

Dictionary mapping field names to their descriptions and types

TYPE: dict[str, str]

instructions

Additional instructions for the extraction process

TYPE: Optional[str]

language

Language of the document and extraction (e.g., 'polish', 'english')

TYPE: str

Functions

to_schema_string

to_schema_string() -> str

Convert schema to string format for prompt.

Source code in juddges/extraction/gemini_chain.py
def to_schema_string(self) -> str:
    """Convert schema to string format for prompt."""
    return "\n".join(f"{key}: {val}" for key, val in self.fields.items())

to_pydantic_model

to_pydantic_model(model_name: str = 'ExtractionOutput') -> type[BaseModel]

Convert schema to a Pydantic model for structured output.

Creates a dynamic Pydantic model with all fields as Optional[Any] to handle the variety of data types defined in the schema (strings, lists, dicts, etc.).

PARAMETER DESCRIPTION
model_name

Name for the generated Pydantic model

TYPE: str DEFAULT: 'ExtractionOutput'

RETURNS DESCRIPTION
type[BaseModel]

Dynamically created Pydantic BaseModel class

Source code in juddges/extraction/gemini_chain.py
def to_pydantic_model(self, model_name: str = "ExtractionOutput") -> type[BaseModel]:
    """Convert schema to a Pydantic model for structured output.

    Creates a dynamic Pydantic model with all fields as Optional[Any] to handle
    the variety of data types defined in the schema (strings, lists, dicts, etc.).

    Args:
        model_name: Name for the generated Pydantic model

    Returns:
        Dynamically created Pydantic BaseModel class
    """
    # Create field definitions - all fields are Optional[Any] to handle diverse types
    field_definitions = {
        field_name: (
            Optional[Any],
            Field(default=None, description=field_desc[:500]),
        )  # Truncate long descriptions
        for field_name, field_desc in self.fields.items()
    }

    # Create dynamic Pydantic model
    return create_model(
        model_name,
        **field_definitions,
        __doc__=f"Structured extraction output for {self.language} legal documents",
    )

DocumentType

Bases: str, Enum

Supported document types for extraction.

Schema Design Best Practices

Field Definitions

Be explicit and specific in field descriptions:

# Good: Specific type and format
"verdict_date": "date as ISO 8601 (YYYY-MM-DD), when the verdict was issued"

# Bad: Vague description
"verdict_date": "the date"

Enum Fields

Provide explicit choices:

schema = ExtractionSchema(
    fields={
        "judgment_type": "enum: one of [Wyrok, Postanowienie, Uchwała]",
        "finality": "enum: one of [Prawomocne, Nieprawomocne]"
    }
)

List Fields

Specify list format clearly:

schema = ExtractionSchema(
    fields={
        "parties": "List[string], names of all parties involved in the case",
        "legal_bases": "List[string], legal bases cited (e.g., 'Art. 123 KC')"
    }
)

Boolean Fields

Use clear true/false criteria:

schema = ExtractionSchema(
    fields={
        "appeal_allowed": "boolean, true if appeal is explicitly allowed, false otherwise"
    },
    instructions="Only mark boolean fields as true when explicitly confirmed."
)

Caching

How Caching Works

The chain uses SQLite caching to store API responses:

chain = GeminiExtractionChain(
    cache_path="cache/extraction.db"  # SQLite database file
)

# First call: Makes API request
result1 = chain.extract(...)  # API call

# Second call with same input: Returns cached result
result2 = chain.extract(...)  # No API call (cached)

Cache Benefits

  • Cost Reduction: Avoid repeated API charges
  • Speed: Instant responses for cached queries
  • Reliability: Work offline with previously cached data

Cache Location

Default cache: .cache/langchain.db

Custom cache:

chain = GeminiExtractionChain(
    cache_path="my_cache/extraction.db"
)

Langfuse Integration

Setup Langfuse

from langfuse.callback import CallbackHandler

handler = CallbackHandler(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host="https://cloud.langfuse.com"
)

Tracked Metrics

Langfuse tracks:

  • Traces: Full extraction pipelines
  • Latency: Response times
  • Token Usage: Input/output tokens
  • Costs: API costs per extraction
  • Errors: Failed extractions

Viewing Results

Access Langfuse dashboard:

https://cloud.langfuse.com

Filter by:

  • Document type
  • Date range
  • Success/failure status
  • Cost thresholds

Error Handling

try:
    result = chain.extract(
        document_type=DocumentType.JUDGMENT,
        text=text,
        schema=schema
    )
except ValueError as e:
    # Invalid document type or schema
    print(f"Validation error: {e}")
except Exception as e:
    # API errors, parsing errors, etc.
    print(f"Extraction failed: {e}")
    # Check Langfuse for detailed trace

Model Selection

chain = GeminiExtractionChain(
    model_name="gemini-2.5-flash"
)

Pros:

  • Faster responses
  • Lower cost
  • Good accuracy for structured tasks

Cons:

  • Slightly lower accuracy on complex cases

Gemini 2.5 Pro

chain = GeminiExtractionChain(
    model_name="gemini-2.5-pro"
)

Pros:

  • Highest accuracy
  • Better on complex documents
  • More reliable enum classification

Cons:

  • Higher cost
  • Slower responses

Performance Optimization

Batch Processing

Process multiple documents in one API call:

# More efficient than individual extractions
results = chain.batch_extract(
    document_type=DocumentType.JUDGMENT,
    texts=texts,  # List of 10-100 documents
    schema=schema
)

Text Truncation

Long documents are automatically truncated:

result = chain.extract(
    document_type=DocumentType.JUDGMENT,
    text=very_long_text,
    schema=schema,
    max_text_length=150000  # Truncate at 150k chars
)

Temperature Control

Use temperature=0.0 for deterministic extraction:

chain = GeminiExtractionChain(
    temperature=0.0  # Deterministic outputs
)

Common Patterns

Production Extraction Pipeline

import os
from juddges.extraction.gemini_chain import (
    GeminiExtractionChain,
    ExtractionSchema,
    DocumentType
)
from langfuse.callback import CallbackHandler

# Initialize components
chain = GeminiExtractionChain(
    model_name="gemini-2.5-flash",
    api_key=os.getenv("GOOGLE_API_KEY"),
    cache_path="cache/production.db"
)

langfuse = CallbackHandler(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY")
)

# Load schema
schema = ExtractionSchema.from_file("schemas/judgment_schema.yaml")

# Extract with monitoring
results = []
for doc in documents:
    try:
        result = chain.extract(
            document_type=DocumentType.JUDGMENT,
            text=doc["text"],
            schema=schema,
            langfuse_handler=langfuse
        )
        results.append(result)
    except Exception as e:
        logger.error(f"Failed on doc {doc['id']}: {e}")
        continue

Cost-Optimized Extraction

# Use Flash model for bulk extraction
chain = GeminiExtractionChain(
    model_name="gemini-2.5-flash",  # Lower cost
    cache_path="cache/bulk.db",      # Enable caching
    max_output_tokens=4096           # Limit token usage
)

# Batch process for efficiency
results = chain.batch_extract(
    document_type=DocumentType.JUDGMENT,
    texts=texts,
    schema=schema
)