Gemini Extraction Chain¶
LangChain-based information extraction using Google Gemini 2.5 Pro/Flash with caching and observability.
Overview¶
The juddges.extraction.gemini_chain module provides a production-ready extraction pipeline using Google's Gemini 2.5 models. It's designed for structured information extraction from legal documents with:
- Caching to reduce API costs
- Langfuse integration for observability
- Schema-driven extraction
- Batch processing support
- Automatic error handling
Key Features¶
- Multiple Models: Gemini 2.5 Pro and Flash support
- SQLite Caching: Avoid redundant API calls (cost savings)
- Langfuse Observability: Track extraction runs, costs, and performance
- Structured Output: Parse JSON responses to dictionaries
- Document Type Aware: Optimized prompts for judgments vs tax interpretations
- Batch Extraction: Process multiple documents efficiently
- Automatic Truncation: Handle documents exceeding token limits
Usage Examples¶
Basic Extraction¶
from juddges.extraction.gemini_chain import (
GeminiExtractionChain,
ExtractionSchema,
DocumentType
)
# Initialize chain
chain = GeminiExtractionChain(
model_name="gemini-2.5-flash",
temperature=0.0,
cache_path="cache/extraction.db"
)
# Define extraction schema
schema = ExtractionSchema(
fields={
"verdict_date": "date as ISO 8601, when the verdict was issued",
"court": "string, name of the court",
"case_number": "string, case identifier",
"parties": "List[string], names of involved parties"
},
instructions="Focus on extracting factual information only.",
language="polish"
)
# Extract from judgment
result = chain.extract(
document_type=DocumentType.JUDGMENT,
text="Sąd Okręgowy w Warszawie dnia 15 stycznia 2024...",
schema=schema
)
print(result)
# {
# "verdict_date": "2024-01-15",
# "court": "Sąd Okręgowy w Warszawie",
# "case_number": "...",
# "parties": ["Jan Kowalski", "XYZ Bank"]
# }
Extraction with Langfuse Observability¶
from langfuse.callback import CallbackHandler
# Initialize Langfuse handler
langfuse_handler = CallbackHandler(
public_key="pk-...",
secret_key="sk-...",
host="https://cloud.langfuse.com"
)
# Extract with tracing
result = chain.extract(
document_type=DocumentType.JUDGMENT,
text=judgment_text,
schema=schema,
langfuse_handler=langfuse_handler
)
# View trace in Langfuse dashboard
Batch Extraction¶
# Extract from multiple documents
texts = [judgment1, judgment2, judgment3]
results = chain.batch_extract(
document_type=DocumentType.JUDGMENT,
texts=texts,
schema=schema,
langfuse_handler=langfuse_handler
)
# Process results
for i, result in enumerate(results):
print(f"Document {i}: {result}")
Tax Interpretation Extraction¶
# Different document type with specialized prompt
schema = ExtractionSchema(
fields={
"interpretation_date": "date as ISO 8601",
"tax_authority": "string, issuing tax authority",
"taxpayer": "string, name of taxpayer",
"interpretation_subject": "string, subject of interpretation"
},
language="polish"
)
result = chain.extract(
document_type=DocumentType.TAX_INTERPRETATION,
text=tax_interpretation_text,
schema=schema
)
API Reference¶
GeminiExtractionChain
¶
GeminiExtractionChain(model_name: Literal['gemini-2.5-pro', 'gemini-2.5-flash', 'gemini-2.0-flash-exp', 'gemini-1.5-pro', 'gemini-1.5-flash'] = 'gemini-2.5-pro', project: Optional[str] = None, location: str = 'us-central1', temperature: float = 0.0, cache_path: Optional[str | Path] = None, max_output_tokens: Optional[int] = 8192, enable_thinking: bool = False)
LangChain extraction chain using Gemini 2.5 Pro with guaranteed valid JSON output.
Features: - Google Gemini 2.5 Pro/Flash model support - Native structured output via with_structured_output() - guarantees valid JSON responses - Eliminates JSON parsing errors by using Gemini's response_schema API - Optional extended thinking mode for Gemini 2.5 models (disabled by default) - PostgreSQL caching (via POSTGRES_CACHE_URL env var) with SQLite fallback - Langfuse callback integration for observability - Document type-aware prompting - Dynamic Pydantic model generation from ExtractionSchema
Thinking Mode (Gemini 2.5 only): - Extended thinking mode shows the model's reasoning process before providing the answer - Can improve accuracy for complex reasoning tasks - Increases latency and token usage - Default: disabled (enable_thinking=False) - Recommended: keep disabled for structured extraction tasks, enable for complex reasoning
Cache Configuration: - Set POSTGRES_CACHE_URL environment variable for PostgreSQL caching - Falls back to SQLite if PostgreSQL is unavailable or not configured - Custom SQLite path can be specified via cache_path parameter
Example (default - no thinking): >>> chain = GeminiExtractionChain( ... model_name="gemini-2.5-pro", ... cache_path="cache/extraction.db", # SQLite fallback path ... temperature=0.0, ... ) >>> >>> schema = ExtractionSchema( ... fields={ ... "verdict_date": "date as ISO 8601", ... "court": "string, name of the court", ... }, ... language="polish", ... ) >>> >>> result = chain.extract( ... document_type=DocumentType.JUDGMENT, ... text="Sąd Najwyższy orzekł dnia 2024-01-15...", ... schema=schema, ... langfuse_handler=my_langfuse_handler, # Optional ... ) >>> print(result) # {"verdict_date": "2024-01-15", "court": "Sąd Najwyższy"}
Example (with thinking enabled): >>> chain = GeminiExtractionChain( ... model_name="gemini-2.5-pro", ... enable_thinking=True, # Enable extended thinking mode ... ) >>> # Model will show reasoning process in responses
| PARAMETER | DESCRIPTION |
|---|---|
model_name
|
Gemini model to use (via Vertex AI)
TYPE:
|
project
|
GCP project ID (defaults to VERTEX_PROJECT or gcloud default)
TYPE:
|
location
|
GCP region (default: us-central1)
TYPE:
|
temperature
|
Sampling temperature (0.0 for deterministic)
TYPE:
|
cache_path
|
Path to SQLite cache file (used as fallback if PostgreSQL unavailable)
TYPE:
|
max_output_tokens
|
Maximum tokens in response
TYPE:
|
enable_thinking
|
Enable extended thinking mode for Gemini 2.5 models (default: False). When enabled, the model shows its reasoning process before answering. This can improve accuracy for complex tasks but increases latency and token usage. Recommended for complex reasoning tasks, not for simple structured extraction.
TYPE:
|
Environment Variables
POSTGRES_CACHE_URL: PostgreSQL connection string for LLM caching (preferred) VERTEX_PROJECT: GCP project ID for Vertex AI GOOGLE_CLOUD_PROJECT: Alternative GCP project ID
Source code in juddges/extraction/gemini_chain.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 | |
Functions¶
extract
¶
extract(document_type: DocumentType, text: str, schema: ExtractionSchema, langfuse_handler: Optional[BaseCallbackHandler] = None, max_text_length: int = 150000) -> dict[str, Any]
Extract structured information from document text.
| PARAMETER | DESCRIPTION |
|---|---|
document_type
|
Type of document (judgment or tax interpretation)
TYPE:
|
text
|
Full text of the document
TYPE:
|
schema
|
Extraction schema defining fields and instructions
TYPE:
|
langfuse_handler
|
Optional Langfuse callback handler for observability
TYPE:
|
max_text_length
|
Maximum text length to process (truncates if longer)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with extracted information matching schema fields |
Example
schema = ExtractionSchema( ... fields={ ... "verdict_date": "date as ISO 8601", ... "court": "string, court name", ... "case_number": "string, case identifier", ... }, ... language="polish", ... ) result = chain.extract( ... document_type=DocumentType.JUDGMENT, ... text="Sąd Okręgowy w Warszawie...", ... schema=schema, ... )
Source code in juddges/extraction/gemini_chain.py
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 | |
batch_extract
¶
batch_extract(document_type: DocumentType, texts: list[str], schema: ExtractionSchema, langfuse_handler: Optional[BaseCallbackHandler] = None, max_text_length: int = 150000) -> list[dict[str, Any]]
Extract information from multiple documents in batch.
| PARAMETER | DESCRIPTION |
|---|---|
document_type
|
Type of documents
TYPE:
|
texts
|
List of document texts
TYPE:
|
schema
|
Extraction schema
TYPE:
|
langfuse_handler
|
Optional Langfuse callback handler
TYPE:
|
max_text_length
|
Maximum text length per document
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, Any]]
|
List of extraction results as dictionaries |
Source code in juddges/extraction/gemini_chain.py
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 | |
ExtractionSchema
¶
Bases: BaseModel
Schema definition for information extraction.
| ATTRIBUTE | DESCRIPTION |
|---|---|
fields |
Dictionary mapping field names to their descriptions and types
TYPE:
|
instructions |
Additional instructions for the extraction process
TYPE:
|
language |
Language of the document and extraction (e.g., 'polish', 'english')
TYPE:
|
Functions¶
to_schema_string
¶
to_pydantic_model
¶
Convert schema to a Pydantic model for structured output.
Creates a dynamic Pydantic model with all fields as Optional[Any] to handle the variety of data types defined in the schema (strings, lists, dicts, etc.).
| PARAMETER | DESCRIPTION |
|---|---|
model_name
|
Name for the generated Pydantic model
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
Dynamically created Pydantic BaseModel class |
Source code in juddges/extraction/gemini_chain.py
DocumentType
¶
Bases: str, Enum
Supported document types for extraction.
Schema Design Best Practices¶
Field Definitions¶
Be explicit and specific in field descriptions:
# Good: Specific type and format
"verdict_date": "date as ISO 8601 (YYYY-MM-DD), when the verdict was issued"
# Bad: Vague description
"verdict_date": "the date"
Enum Fields¶
Provide explicit choices:
schema = ExtractionSchema(
fields={
"judgment_type": "enum: one of [Wyrok, Postanowienie, Uchwała]",
"finality": "enum: one of [Prawomocne, Nieprawomocne]"
}
)
List Fields¶
Specify list format clearly:
schema = ExtractionSchema(
fields={
"parties": "List[string], names of all parties involved in the case",
"legal_bases": "List[string], legal bases cited (e.g., 'Art. 123 KC')"
}
)
Boolean Fields¶
Use clear true/false criteria:
schema = ExtractionSchema(
fields={
"appeal_allowed": "boolean, true if appeal is explicitly allowed, false otherwise"
},
instructions="Only mark boolean fields as true when explicitly confirmed."
)
Caching¶
How Caching Works¶
The chain uses SQLite caching to store API responses:
chain = GeminiExtractionChain(
cache_path="cache/extraction.db" # SQLite database file
)
# First call: Makes API request
result1 = chain.extract(...) # API call
# Second call with same input: Returns cached result
result2 = chain.extract(...) # No API call (cached)
Cache Benefits¶
- Cost Reduction: Avoid repeated API charges
- Speed: Instant responses for cached queries
- Reliability: Work offline with previously cached data
Cache Location¶
Default cache: .cache/langchain.db
Custom cache:
Langfuse Integration¶
Setup Langfuse¶
from langfuse.callback import CallbackHandler
handler = CallbackHandler(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host="https://cloud.langfuse.com"
)
Tracked Metrics¶
Langfuse tracks:
- Traces: Full extraction pipelines
- Latency: Response times
- Token Usage: Input/output tokens
- Costs: API costs per extraction
- Errors: Failed extractions
Viewing Results¶
Access Langfuse dashboard:
Filter by:
- Document type
- Date range
- Success/failure status
- Cost thresholds
Error Handling¶
try:
result = chain.extract(
document_type=DocumentType.JUDGMENT,
text=text,
schema=schema
)
except ValueError as e:
# Invalid document type or schema
print(f"Validation error: {e}")
except Exception as e:
# API errors, parsing errors, etc.
print(f"Extraction failed: {e}")
# Check Langfuse for detailed trace
Model Selection¶
Gemini 2.5 Flash (Recommended)¶
Pros:
- Faster responses
- Lower cost
- Good accuracy for structured tasks
Cons:
- Slightly lower accuracy on complex cases
Gemini 2.5 Pro¶
Pros:
- Highest accuracy
- Better on complex documents
- More reliable enum classification
Cons:
- Higher cost
- Slower responses
Performance Optimization¶
Batch Processing¶
Process multiple documents in one API call:
# More efficient than individual extractions
results = chain.batch_extract(
document_type=DocumentType.JUDGMENT,
texts=texts, # List of 10-100 documents
schema=schema
)
Text Truncation¶
Long documents are automatically truncated:
result = chain.extract(
document_type=DocumentType.JUDGMENT,
text=very_long_text,
schema=schema,
max_text_length=150000 # Truncate at 150k chars
)
Temperature Control¶
Use temperature=0.0 for deterministic extraction:
Related¶
- Evaluation Metrics - Evaluate extraction quality
- Gemini Tutorial - Complete tutorial
- Langfuse Setup - Observability setup
Common Patterns¶
Production Extraction Pipeline¶
import os
from juddges.extraction.gemini_chain import (
GeminiExtractionChain,
ExtractionSchema,
DocumentType
)
from langfuse.callback import CallbackHandler
# Initialize components
chain = GeminiExtractionChain(
model_name="gemini-2.5-flash",
api_key=os.getenv("GOOGLE_API_KEY"),
cache_path="cache/production.db"
)
langfuse = CallbackHandler(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY")
)
# Load schema
schema = ExtractionSchema.from_file("schemas/judgment_schema.yaml")
# Extract with monitoring
results = []
for doc in documents:
try:
result = chain.extract(
document_type=DocumentType.JUDGMENT,
text=doc["text"],
schema=schema,
langfuse_handler=langfuse
)
results.append(result)
except Exception as e:
logger.error(f"Failed on doc {doc['id']}: {e}")
continue
Cost-Optimized Extraction¶
# Use Flash model for bulk extraction
chain = GeminiExtractionChain(
model_name="gemini-2.5-flash", # Lower cost
cache_path="cache/bulk.db", # Enable caching
max_output_tokens=4096 # Limit token usage
)
# Batch process for efficiency
results = chain.batch_extract(
document_type=DocumentType.JUDGMENT,
texts=texts,
schema=schema
)