Data Management API¶
Comprehensive data management modules for loading, transforming, and storing legal documents in Weaviate.
Overview¶
The juddges.data package provides a complete data management layer including:
- Dataset Loading: Load and prepare datasets for ingestion
- Weaviate Operations: Database connections and operations
- Schema Definitions: Legal document schemas for vector database
- Stream Ingestion: Production-grade streaming ingestion pipeline
- Column Mapping: Flexible dataset-to-schema mapping
Core Modules¶
Data Loaders¶
Module: juddges.data.loaders
Load datasets with automatic column remapping for Weaviate ingestion.
Key Classes:
- DatasetLoader - Load chunk and document datasets
Key Features: - HuggingFace Datasets integration - Automatic column mapping - Polars-based efficient processing - Multi-dataset support
Common Use Cases:
from juddges.data.loaders import DatasetLoader
loader = DatasetLoader(config)
doc_dataset = loader.load_document_dataset()
chunk_dataset = loader.load_chunk_dataset()
Base Weaviate Database¶
Module: juddges.data.base_weaviate_db
Base class for all Weaviate database operations with connection management and batch operations.
Key Classes:
- WeaviateDatabase - Base database class with common operations
Key Features: - Connection pooling - Collection management - Batch operations - Error handling
Common Use Cases:
from juddges.data.base_weaviate_db import WeaviateDatabase
db = WeaviateDatabase(url="http://localhost:8080")
collection = db.client.collections.get("collection_name")
Judgments Database¶
Module: juddges.data.judgments_weaviate_db
Specialized Weaviate database for court judgment storage and retrieval.
Key Classes:
- WeaviateJudgmentsDatabase - Judgment-specific database operations
Key Features: - 50+ field legal judgment schema - UMAP coordinate support - Chunk and document collections - Cross-references between collections
Common Use Cases:
from juddges.data.judgments_weaviate_db import WeaviateJudgmentsDatabase
db = WeaviateJudgmentsDatabase(url="http://localhost:8080")
judgments = db.judgments_collection
chunks = db.judgment_chunks_collection
Documents Database¶
Module: juddges.data.documents_weaviate_db
Generic document database for non-judgment documents.
Key Classes:
- WeaviateDocumentsDatabase - Generic document operations
Key Features: - Flexible schema - Generic document storage - Embedding support
Stream Ingester¶
Module: juddges.data.stream_ingester
Production-grade streaming ingestion pipeline with error handling and progress tracking.
Key Classes:
- StreamIngester - Batch streaming with retry logic
Key Features: - Memory-efficient streaming - Automatic batching - Error recovery - Progress tracking - Configurable batch sizes
Common Use Cases:
from juddges.data.stream_ingester import StreamIngester
ingester = StreamIngester(
db=db,
batch_size=100,
max_retries=3
)
ingester.ingest_stream(dataset)
Dataset Factory¶
Module: juddges.data.dataset_factory
Factory for creating and managing datasets with different configurations.
Key Classes:
- DatasetFactory - Create datasets from configurations
Dataset Mapper¶
Module: juddges.data.dataset_mapper
Utilities for mapping between different dataset schemas.
Key Functions: - Column remapping functions - Schema validators
Schemas¶
Module: juddges.data.schemas
Pydantic models for data validation and schema definitions.
Key Classes: - Schema definitions for legal documents - Validation models
Utils¶
Module: juddges.data.utils
Utility functions for data processing.
Key Functions: - Date parsing - Field extraction - Data cleaning
Quick Start¶
Loading and Ingesting Data¶
from juddges.config import EmbeddingConfig
from juddges.data.loaders import DatasetLoader
from juddges.data.judgments_weaviate_db import WeaviateJudgmentsDatabase
from juddges.data.stream_ingester import StreamIngester
# 1. Configure
config = EmbeddingConfig(
dataset_name="juddges/pl-court-raw",
agg_embeddings_dir="data/embeddings/agg",
chunk_embeddings_dir="data/embeddings/chunks"
)
# 2. Load datasets
loader = DatasetLoader(config)
doc_dataset = loader.load_document_dataset()
chunk_dataset = loader.load_chunk_dataset()
# 3. Connect to Weaviate
db = WeaviateJudgmentsDatabase(url="http://localhost:8080")
# 4. Ingest with streaming
ingester = StreamIngester(db=db, batch_size=100)
ingester.ingest_documents(doc_dataset)
ingester.ingest_chunks(chunk_dataset)
Querying Judgments¶
from juddges.data.judgments_weaviate_db import WeaviateJudgmentsDatabase
# Connect
db = WeaviateJudgmentsDatabase(url="http://localhost:8080")
# Semantic search
results = db.judgments_collection.query.near_text(
query="Swiss franc loan conversion",
limit=10
)
# Filter by date range
filtered = db.judgments_collection.query.fetch_objects(
filters={
"judgment_date": {
"operator": "GreaterThanEqual",
"valueDate": "2020-01-01"
}
},
limit=100
)
Data Flow¶
graph LR
A[Raw Dataset] --> B[DatasetLoader]
B --> C[Document Dataset]
B --> D[Chunk Dataset]
C --> E[StreamIngester]
D --> E
E --> F[Weaviate Judgments DB]
F --> G[Judgments Collection]
F --> H[Chunks Collection]
Architecture¶
Collection Structure¶
Weaviate Instance
├── judgments
│ ├── Properties (50+ fields)
│ ├── Embeddings (document-level)
│ └── UMAP coordinates (x, y)
└── judgment_chunks
├── Properties (chunk metadata)
├── Embeddings (chunk-level)
├── UMAP coordinates (x, y)
└── References → judgments
Dataset Mapping Flow¶
graph TD
A[HuggingFace Dataset] --> B{Mapping Exists?}
B -->|Yes| C[Apply Column Mapping]
B -->|No| D[Use Original Columns]
C --> E[Validate Schema]
D --> E
E --> F[Ingest to Weaviate]
Related Documentation¶
Tutorials¶
- Getting Started - Complete setup guide
- Gemini Extraction - Extract structured data
How-To Guides¶
- Embed and Ingest - Complete workflow
- Deploy Weaviate - Database setup
- Universal Ingestion - Flexible ingestion
Reference¶
- Schema Mapping - Field definitions
- Dataset-Weaviate Mapping - Column mappings
Common Patterns¶
Adding New Dataset¶
# 1. Add column mapping in loaders.py
DATASET_COLUMN_MAPPINGS["your/dataset"] = {
"judgment_id": "doc_id",
"full_text": "content",
# ... more mappings
}
# 2. Load dataset
loader = DatasetLoader(config)
dataset = loader.load_document_dataset()
# 3. Ingest
ingester = StreamIngester(db=db)
ingester.ingest_documents(dataset)
Custom Collection Schema¶
import weaviate.classes.config as wvcc
from juddges.data.base_weaviate_db import WeaviateDatabase
class CustomDatabase(WeaviateDatabase):
async def async_create_collections(self):
await self.async_safe_create_collection(
name="custom_collection",
properties=[
wvcc.Property(
name="custom_field",
data_type=wvcc.DataType.TEXT,
index_searchable=True
),
# ... more properties
],
vectorizer_config=wvcc.Configure.Vectorizer.text2vec_transformers()
)
Performance Tips¶
Batch Size Optimization¶
# Small documents (< 1KB)
ingester = StreamIngester(db=db, batch_size=500)
# Medium documents (1-10KB)
ingester = StreamIngester(db=db, batch_size=100)
# Large documents (> 10KB)
ingester = StreamIngester(db=db, batch_size=50)
Memory Management¶
# Stream large datasets
ds = load_dataset("large-dataset", streaming=True)
for batch in ds.iter(batch_size=100):
ingester.ingest_batch(batch)
Troubleshooting¶
Connection Issues¶
# Test connection
from juddges.data.judgments_weaviate_db import WeaviateJudgmentsDatabase
try:
db = WeaviateJudgmentsDatabase(url="http://localhost:8080")
print("✓ Connected to Weaviate")
except Exception as e:
print(f"✗ Connection failed: {e}")
Schema Mismatches¶
# Validate dataset columns
expected_columns = set(DATASET_COLUMN_MAPPINGS["your/dataset"].keys())
actual_columns = set(dataset.column_names)
missing = expected_columns - actual_columns
print(f"Missing columns: {missing}")
Ingestion Errors¶
# Enable detailed logging
from loguru import logger
logger.add("ingestion.log", level="DEBUG")
ingester = StreamIngester(
db=db,
batch_size=10, # Smaller batches for debugging
max_retries=3
)
See Also: - LLM API - Model loading and inference - Extraction API - Information extraction - Evaluation API - Evaluation metrics