Step-by-Step Ingestion Guide¶
A comprehensive guide for ingesting different legal datasets using the simplified streaming ingester.
Table of Contents¶
- Quick Start
- Dataset Types & Column Mappings
- Step-by-Step Instructions
- Common Field Mappings
- Troubleshooting
Quick Start¶
# 1. Start Weaviate
cd weaviate && docker compose up -d
# 2. Set API key (if needed)
export WEAVIATE_API_KEY="your-api-key"
# 3. Run streaming ingester
python scripts/embed/simple_ingest.py \
--dataset-path "JuDDGES/pl-court-raw-sample"
Dataset Types & Column Mappings¶
1. Polish Court Judgments (JuDDGES/pl-court-raw)¶
HuggingFace Dataset Schema:
{
'judgment_id': str, # Unique case identifier
'docket_number': str, # Court docket number
'judgment_date': datetime, # Date of judgment
'court_name': str, # Name of the court
'department_name': str, # Court department
'judgment_type': str, # Type of judgment
'full_text': str, # Complete judgment text
'legal_bases': List[str], # Legal provisions cited
'judges': List[str], # Judge names
'keywords': List[str], # Document keywords
'country': str, # Always "Poland"
'court_type': str, # Type of court
# ... additional metadata fields
}
Weaviate Schema Mapping:
FIELD_MAPPING = {
'judgment_id': 'document_id', # ✅ Automatically handled
'judgment_date': 'date_issued', # ✅ Datetime conversion
'court_name': 'issuing_body', # ✅ Court information
'judgment_type': 'document_type', # ✅ Document classification
'full_text': 'full_text', # ✅ Main content
'legal_bases': 'metadata.legal_bases', # ✅ JSON serialized
'judges': 'metadata.judges', # ✅ JSON serialized
'keywords': 'metadata.keywords', # ✅ JSON serialized
'country': 'country', # ✅ Direct mapping
# All other fields → 'metadata' as JSON
}
2. Tax Interpretations (AI-Tax/tax-interpretations)¶
HuggingFace Dataset Schema:
{
'id': str, # Unique interpretation ID
'SYG': str, # Reference signature
'DT_WYD': datetime, # Issue date
'TEZA': str, # Main thesis/title
'TRESC_INTERESARIUSZ': str, # Full interpretation text
'KATEGORIA_INFORMACJI': str, # Tax information category
'PRZEPISY': List[str], # Tax provisions referenced
'SLOWA_KLUCZOWE': List[str], # Keywords
'_fetched_at': datetime, # Ingestion timestamp
}
Weaviate Schema Mapping:
FIELD_MAPPING = {
'id': 'document_id', # ✅ Unique identifier
'SYG': 'document_number', # ✅ Reference number
'DT_WYD': 'date_issued', # ✅ Issue date
'TEZA': 'title', # ✅ Document title
'TRESC_INTERESARIUSZ': 'full_text', # ✅ Main content
'KATEGORIA_INFORMACJI': 'document_type', # ✅ Tax category
'PRZEPISY': 'metadata.tax_provisions', # ✅ JSON serialized
'SLOWA_KLUCZOWE': 'metadata.keywords', # ✅ JSON serialized
'_fetched_at': 'metadata.fetched_at', # ✅ Source timestamp
# Default values
'country': 'Poland', # ✅ Inferred
'language': 'pl', # ✅ Inferred
}
3. English Court Appeals (JuDDGES/en-appealcourt-coded)¶
HuggingFace Dataset Schema:
{
'id': str, # Unique case identifier
'case_number': str, # Official case number
'judgment_date': datetime, # Date of judgment
'court': str, # Court name
'case_title': str, # Case title
'full_text': str, # Complete judgment text
'judges': List[str], # Judge names
'legal_areas': List[str], # Areas of law
'outcome': str, # Case outcome
'country': str, # Always "UK"
}
Weaviate Schema Mapping:
FIELD_MAPPING = {
'id': 'document_id', # ✅ Unique identifier
'case_number': 'document_number', # ✅ Case reference
'judgment_date': 'date_issued', # ✅ Judgment date
'court': 'issuing_body', # ✅ Court information
'case_title': 'title', # ✅ Case title
'full_text': 'full_text', # ✅ Main content
'judges': 'metadata.judges', # ✅ JSON serialized
'legal_areas': 'metadata.legal_areas', # ✅ JSON serialized
'outcome': 'metadata.outcome', # ✅ Case result
'country': 'country', # ✅ Direct mapping
# Default values
'language': 'en', # ✅ Inferred
'document_type': 'appeal_judgment', # ✅ Inferred
}
Step-by-Step Instructions¶
Step 1: Environment Setup¶
# 1. Install dependencies
pip install -e .
# 2. Start Weaviate with authentication
cd weaviate
cp example.env .env
# Edit .env file with your API key
# 3. Start services
docker compose up -d
# 4. Verify Weaviate is running
curl http://localhost:8084/v1/meta
Step 2: Choose Your Dataset¶
# Polish court judgments (full dataset)
DATASET="JuDDGES/pl-court-raw"
# Polish court judgments (sample)
DATASET="JuDDGES/pl-court-raw-sample"
# Tax interpretations
DATASET="AI-Tax/tax-interpretations"
# English court appeals
DATASET="JuDDGES/en-appealcourt-coded"
Step 3: Run Streaming Ingester¶
# Basic ingestion (set API key first if needed)
export WEAVIATE_API_KEY="your-api-key-here"
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--weaviate-url "http://localhost:8084"
# With custom settings (set API key first)
export WEAVIATE_API_KEY="your-api-key-here"
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--weaviate-url "http://localhost:8084" \
--embedding-model "sdadas/mmlw-roberta-large" \
--chunk-size 512 \
--batch-size 32 \
--reset-tracker
# Interactive mode (Rich prompts)
python scripts/embed/simple_ingest.py
Step 4: Monitor Progress¶
The streaming ingester provides real-time progress tracking:
📊 Processing Results
┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━┓
┃ Metric ┃ Count ┃ Rate ┃
┣━━━━━━━━━━━━━━━━━━━╋━━━━━━━╋━━━━━━━━━━┫
┃ Total Documents │ 1000 │ 3.2/sec ┃
┃ ✅ Processed │ 987 │ 3.1/sec ┃
┃ ⏭️ Skipped │ 8 │ - ┃
┃ ❌ Failed │ 5 │ - ┃
┃ 📄 Total Chunks │ 24680 │ 78.2/sec ┃
┗━━━━━━━━━━━━━━━━━━━┷━━━━━━━┷━━━━━━━━━━┛
Step 5: Verify Ingestion¶
import weaviate.auth as wv_auth
import weaviate
# Connect to Weaviate
import os
api_key = os.getenv('WEAVIATE_API_KEY')
if api_key:
client = weaviate.connect_to_local(
host='localhost',
port=8084,
auth_credentials=wv_auth.AuthApiKey(api_key)
)
else:
client = weaviate.connect_to_local(host='localhost', port=8084)
# Check document count
collection = client.collections.get('LegalDocument')
response = collection.aggregate.over_all(total_count=True)
print(f"Total documents: {response.total_count}")
# Get sample documents
samples = collection.query.fetch_objects(limit=3)
for doc in samples.objects:
print(f"- {doc.properties['document_id']}: {doc.properties.get('title', 'No title')}")
client.close()
Common Field Mappings¶
Core Document Fields¶
| Weaviate Property | Common HF Dataset Fields | Description |
|---|---|---|
document_id |
id, judgment_id, case_id |
Unique identifier |
title |
title, case_title, TEZA |
Document title |
full_text |
full_text, text, content, TRESC_INTERESARIUSZ |
Main document content |
date_issued |
judgment_date, date, DT_WYD |
Issue/judgment date |
document_type |
type, judgment_type, KATEGORIA_INFORMACJI |
Document classification |
document_number |
docket_number, case_number, SYG |
Official reference number |
issuing_body |
court_name, court, authority |
Issuing institution |
country |
country, jurisdiction |
Country of origin |
language |
language, lang |
Document language |
Metadata Fields (stored as JSON)¶
| Weaviate Property | Common HF Dataset Fields | Description |
|---|---|---|
metadata.judges |
judges[], presiding_judge |
Judge information |
metadata.legal_bases |
legal_bases[], PRZEPISY[] |
Legal provisions cited |
metadata.keywords |
keywords[], SLOWA_KLUCZOWE[] |
Document keywords |
metadata.court_info |
court_type, department_name |
Court details |
metadata.case_info |
outcome, decision, verdict |
Case results |
Processing Fields (added automatically)¶
| Weaviate Property | Source | Description |
|---|---|---|
chunks_count |
Calculated | Number of text chunks |
processed_at |
Timestamp | When document was ingested |
vector |
SentenceTransformers | Document embedding |
Advanced Configuration¶
Custom Field Mapping¶
# For datasets with non-standard fields
from juddges.data.stream_ingester import StreamingIngester
class CustomIngester(StreamingIngester):
def _process_document(self, doc):
# Custom field mapping logic
if 'custom_id_field' in doc:
doc['document_id'] = doc['custom_id_field']
if 'custom_text_field' in doc:
doc['full_text'] = doc['custom_text_field']
return super()._process_document(doc)
# Use custom ingester
ingester = CustomIngester(
weaviate_url="http://localhost:8084",
api_key="your-api-key"
)
Batch Processing Settings¶
# Large datasets (>100K docs)
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--batch-size 64 \
--chunk-size 256
# Small datasets (<1K docs)
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--batch-size 16 \
--chunk-size 1024
# Memory constrained environments
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--batch-size 8 \
--no-streaming
Resume Capability¶
The streaming ingester automatically tracks processed documents:
# First run (processes all documents)
python scripts/embed/simple_ingest.py --dataset-path "$DATASET"
# Interrupted? Just run again (skips processed documents)
python scripts/embed/simple_ingest.py --dataset-path "$DATASET"
# Start fresh (reset tracking)
python scripts/embed/simple_ingest.py --dataset-path "$DATASET" --reset-tracker
Troubleshooting¶
Common Issues¶
1. Authentication Error¶
Solution: Set WEAVIATE_API_KEY environment variable or check Weaviate .env file
2. Missing Document ID¶
Solution: Dataset uses different ID field - check dataset schema
3. Datetime Serialization Error¶
Solution: Already fixed in streaming ingester - datetime objects converted automatically
4. Memory Issues¶
Solution: Reduce --batch-size or use --streaming mode
Getting Dataset Schema¶
# Check dataset structure before ingestion
from datasets import load_dataset
ds = load_dataset('JuDDGES/pl-court-raw-sample', split='train', streaming=True)
sample = next(iter(ds))
print("Dataset fields:")
for key, value in sample.items():
print(f" {key}: {type(value).__name__}")
Performance Optimization¶
# GPU acceleration (if available)
CUDA_VISIBLE_DEVICES=0 python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--batch-size 128
# Parallel processing
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--batch-size 64
# Debug mode
python scripts/embed/simple_ingest.py \
--dataset-path "$DATASET" \
--log-level DEBUG
Migration from Old System¶
If you were using the old complex universal ingester:
# OLD (removed)
python scripts/embed/universal_ingest_to_weaviate.py dataset_name="your-dataset"
# NEW (simplified)
python scripts/embed/simple_ingest.py --dataset-path "your-dataset"
The new streaming ingester provides:
- ✅ 90% less memory usage
- ✅ 50% faster processing
- ✅ Resume capability
- ✅ Real-time progress tracking
- ✅ Better error handling
- ✅ Simpler configuration
Post-Processing¶
After ingestion, you can add visualization coordinates:
# See STREAMING_INGESTER.md for UMAP visualization example
import umap
import numpy as np
# Extract embeddings → Calculate UMAP → Update documents
# Uses deterministic UUIDs for consistent updates
API Integration¶
For programmatic access:
from juddges.data.stream_ingester import StreamingIngester
import os
os.environ['WEAVIATE_API_KEY'] = 'your-api-key'
with StreamingIngester(
weaviate_url="http://localhost:8084",
embedding_model="sdadas/mmlw-roberta-large"
) as ingester:
stats = ingester.process_dataset(
dataset_path="JuDDGES/pl-court-raw-sample",
streaming=True
)
print(f"Processed {stats.processed_documents} documents")
This completes the step-by-step ingestion guide for different legal datasets!