Batch Processing¶
pyrmute provides batch processing capabilities for migrating large datasets efficiently. This guide covers batch migration methods, streaming, parallel processing, and performance optimization.
Basic Batch Migration¶
Migrate multiple records at once using migrate_batch():
from pydantic import BaseModel
from pyrmute import ModelManager, ModelData
manager = ModelManager()
@manager.model("User", "1.0.0")
class UserV1(BaseModel):
name: str
@manager.model("User", "2.0.0")
class UserV2(BaseModel):
first_name: str
last_name: str
@manager.migration("User", "1.0.0", "2.0.0")
def split_name(data: ModelData) -> ModelData:
parts = data["name"].split(" ", 1)
return {
"first_name": parts[0],
"last_name": parts[1] if len(parts) > 1 else ""
}
# Migrate batch of users
old_users = [
{"name": "Alice Smith"},
{"name": "Bob Jones"},
{"name": "Carol White"},
]
new_users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0"
)
print(new_users)
# [
# UserV2(first_name="Alice", last_name="Smith"),
# UserV2(first_name="Bob", last_name="Jones"),
# UserV2(first_name="Carol", last_name="White")
# ]
Key points:
- Takes an iterable of data dictionaries
- Returns a list of validated Pydantic models
- Processes records sequentially by default
- All records must migrate successfully (fails fast)
Return Types¶
pyrmute offers three return types for batch migrations:
Validated Models (Default)¶
Returns Pydantic model instances:
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0"
)
# Returns: List[UserV2]
# Each item is a validated Pydantic model
Use when:
- You need type safety
- You want Pydantic validation
- You'll use model methods/properties
Type-Safe Models¶
Returns specific model type for better type checking:
users: list[UserV2] = manager.migrate_batch_as(
old_users,
"User",
"1.0.0",
"2.0.0",
UserV2 # Explicit type
)
# Returns: List[UserV2]
# Type checkers know the exact type
Use when:
- You need IDE autocomplete
- You're using type checkers (mypy, pyright)
- You want explicit type annotations
Raw Dictionaries¶
Returns plain dictionaries without validation:
users = manager.migrate_batch_data(
old_users,
"User",
"1.0.0",
"2.0.0"
)
# Returns: List[dict]
# Raw migrated data, no validation
Use when:
- You'll validate later
- You need maximum performance
- You're serializing immediately to JSON/database
Streaming Large Datasets¶
For datasets that don't fit in memory, use streaming:
def load_users_from_database() -> None:
"""Generator that yields users from database."""
# Fetch in chunks to avoid loading all into memory
offset = 0
chunk_size = 1000
while True:
users = database.query(
"SELECT * FROM users LIMIT ? OFFSET ?",
chunk_size, offset
)
if not users:
break
for user in users:
yield user
offset += chunk_size
# Stream migration - processes in chunks
for migrated_user in manager.migrate_batch_streaming(
load_users_from_database(),
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
# Save each migrated user immediately
database.save(migrated_user)
print(f"Migrated user: {migrated_user.first_name}")
Key benefits:
- Constant memory usage regardless of dataset size
- Can process millions of records
- Fails fast on first error
- Progress visible during processing
Streaming Variants¶
Three streaming methods match the three return types:
# Streaming with validated models (default)
for user in manager.migrate_batch_streaming(
data_source,
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
process(user) # user is UserV2
# Streaming with type safety
for user in manager.migrate_batch_streaming_as(
data_source,
"User",
"1.0.0",
"2.0.0",
UserV2,
chunk_size=1000
):
process(user) # Type checker knows user is UserV2
# Streaming raw dictionaries
for user_data in manager.migrate_batch_data_streaming(
data_source,
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
process(user_data) # user_data is dict
Chunk Size Selection¶
Choose chunk size based on your constraints:
# Small chunks (100-500) - Lower memory, more overhead
for user in manager.migrate_batch_streaming(
data_source,
"User",
"1.0.0",
"2.0.0",
chunk_size=100 # Good for: Memory-constrained environments
):
process(user)
# Medium chunks (1000-5000) - Balanced (default: 100)
for user in manager.migrate_batch_streaming(
data_source,
"User",
"1.0.0",
"2.0.0",
chunk_size=1000 # Good for: Most use cases
):
process(user)
# Large chunks (10000+) - Higher memory, less overhead
for user in manager.migrate_batch_streaming(
data_source,
"User",
"1.0.0",
"2.0.0",
chunk_size=10000 # Good for: Fast migrations, ample memory
):
process(user)
Parallel Processing¶
Enable parallel processing for CPU-intensive migrations:
# Sequential processing (default)
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0"
)
# Parallel processing with threads
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
max_workers=4
)
# Parallel processing with processes
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
use_processes=True,
max_workers=4
)
Threads vs. Processes¶
ThreadPoolExecutor (Default)¶
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
use_processes=False, # Use threads (default)
max_workers=4
)
Use threads when:
- Migrations are I/O-bound (reading files, API calls)
- Working with shared state
- Lower overhead needed
- GIL isn't a bottleneck
Limitations:
- Python's GIL limits CPU parallelism
- Not effective for pure computation
ProcessPoolExecutor¶
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
use_processes=True, # Use processes
max_workers=4
)
Use processes when:
- Migrations are CPU-intensive (complex transformations)
- Need true parallelism
- No shared state required
- Have enough memory
Limitations:
- Higher memory overhead (copies data)
- Slower startup time
- Serialization overhead
- Can't share state between processes
Worker Count Selection¶
import os
# Automatic: Use CPU count
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
max_workers=None # Auto-detects CPU count
)
# Manual: Specific number
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
max_workers=4 # Explicit count
)
# Conservative: Leave cores for other work
cpu_count = os.cpu_count() or 1
users = manager.migrate_batch(
old_users,
"User",
"1.0.0",
"2.0.0",
parallel=True,
max_workers=max(1, cpu_count - 2) # Reserve 2 cores
)
Performance Optimization¶
When to Use Parallel Processing¶
Use parallel processing when:
import time
# CPU-intensive migration
@manager.migration("Document", "1.0.0", "2.0.0")
def complex_transformation(data: ModelData) -> ModelData:
# Expensive text processing
text = data["content"]
# Complex transformations
processed = analyze_sentiment(text)
keywords = extract_keywords(text)
summary = generate_summary(text)
return {
**data,
"sentiment": processed,
"keywords": keywords,
"summary": summary
}
# Large dataset + CPU-intensive = Good candidate for parallelization
documents = manager.migrate_batch(
large_document_list,
"Document",
"1.0.0",
"2.0.0",
parallel=True,
use_processes=True,
max_workers=8
)
Don't use parallel processing when:
# Simple migration - overhead exceeds benefit
@manager.migration("User", "1.0.0", "2.0.0")
def simple_migration(data: ModelData) -> ModelData:
return {**data, "status": "active"}
# Sequential is faster for simple migrations
users = manager.migrate_batch(
users_list,
"User",
"1.0.0",
"2.0.0"
# parallel=False (default)
)
Benchmarking¶
Always benchmark to determine optimal settings:
import time
def benchmark_migration(
data: Iterable[ModelData],
parallel: bool = False,
use_processes: bool = False,
workers: int | None = None
) -> list[UserV2]:
"""Benchmark migration performance."""
start = time.time()
results = manager.migrate_batch(
data,
"User",
"1.0.0",
"2.0.0",
parallel=parallel,
use_processes=use_processes,
max_workers=workers
)
duration = time.time() - start
throughput = len(data) / duration
print(f"Mode: {'Parallel' if parallel else 'Sequential'}")
if parallel:
mode = "Processes" if use_processes else "Threads"
print(f" Type: {mode}, Workers: {workers}")
print(f" Duration: {duration:.2f}s")
print(f" Throughput: {throughput:.0f} records/sec")
return results
# Test different configurations
test_data = [{"name": f"User {i}"} for i in range(10000)]
print("Sequential:")
benchmark_migration(test_data)
print("\nThreads (4 workers):")
benchmark_migration(test_data, parallel=True, workers=4)
print("\nProcesses (4 workers):")
benchmark_migration(test_data, parallel=True, use_processes=True, workers=4)
Memory Management¶
Control memory usage with streaming:
# BAD - Loads entire dataset into memory
with open("large_file.json") as f:
all_data = json.load(f) # 10GB in memory!
users = manager.migrate_batch(all_data, "User", "1.0.0", "2.0.0")
# GOOD - Streams data, constant memory
def read_jsonl(filepath: str | Path) -> Generator[str]:
"""Read JSON lines file (one JSON per line)."""
with open(filepath) as f:
for line in f:
yield json.loads(line)
for user in manager.migrate_batch_streaming(
read_jsonl("large_file.jsonl"),
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
database.save(user)
Error Handling¶
Fail Fast (Default)¶
By default, batch migrations fail on first error:
data = [
{"name": "Alice Smith"},
{"name": ""}, # Invalid - will cause failure
{"name": "Bob Jones"},
]
try:
users = manager.migrate_batch(data, "User", "1.0.0", "2.0.0")
except Exception as e:
print(f"Migration failed: {e}")
# Stops at first error
Graceful Error Handling¶
Handle errors manually for better control:
def migrate_with_error_handling(
data: Iterable[ModelData]
) -> tuple[list[UserV2], list[UserV2]]:
"""Migrate data with error tracking."""
successful = []
failed = []
for i, record in enumerate(data):
try:
migrated = manager.migrate(
record,
"User",
"1.0.0",
"2.0.0"
)
successful.append(migrated)
except Exception as e:
failed.append({
"index": i,
"data": record,
"error": str(e)
})
print(f"Failed to migrate record {i}: {e}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
return successful, failed
successful_users, failed_records = migrate_with_error_handling(user_data)
# Save successful migrations
for user in successful_users:
database.save(user)
# Log failures for manual review
for failure in failed_records:
logger.error(f"Migration failed", extra=failure)
Retry Logic¶
Implement retry logic for transient failures:
import time
def migrate_with_retry(
data: Iterable[ModelData], max_retries: int = 3
) -> list[UserV2]:
"""Migrate with retry logic."""
results = []
for record in data:
for attempt in range(max_retries):
try:
migrated = manager.migrate(
record,
"User",
"1.0.0",
"2.0.0"
)
results.append(migrated)
break # Success
except Exception as e:
if attempt == max_retries - 1:
# Final attempt failed
print(f"Failed after {max_retries} attempts: {e}")
raise
else:
# Retry with exponential backoff
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
time.sleep(wait_time)
return results
Common Patterns¶
Database Migration¶
def migrate_database_table(table_name: str, batch_size: int = 1000) -> None:
"""Migrate entire database table."""
def fetch_records() -> Generator[ModelData]:
"""Generator to fetch records in batches."""
offset = 0
while True:
records = database.query(
f"SELECT * FROM {table_name} LIMIT ? OFFSET ?",
batch_size, offset
)
if not records:
break
for record in records:
yield record
offset += batch_size
# Migrate and update in place
migrated_count = 0
for user in manager.migrate_batch_streaming(
fetch_records(),
"User",
"1.0.0",
"2.0.0",
chunk_size=batch_size
):
database.update(
f"UPDATE {table_name} SET first_name=?, last_name=? WHERE id=?",
user.first_name, user.last_name, user.id
)
migrated_count += 1
if migrated_count % 10000 == 0:
print(f"Migrated {migrated_count} records...")
print(f"Complete! Migrated {migrated_count} records total")
File Processing¶
import json
from pathlib import Path
def migrate_json_files(input_dir: Path, output_dir: Path) -> None:
"""Migrate all JSON files in a directory."""
output_dir.mkdir(parents=True, exist_ok=True)
for input_file in input_dir.glob("*.json"):
print(f"Processing {input_file.name}...")
with open(input_file) as f:
data = json.load(f)
if isinstance(data, list):
migrated = manager.migrate_batch(
data,
"User",
"1.0.0",
"2.0.0"
)
else:
# Single record
migrated = manager.migrate(
data,
"User",
"1.0.0",
"2.0.0"
)
output_file = output_dir / input_file.name
with open(output_file, "w") as f:
if isinstance(migrated, list):
json.dump([m.model_dump() for m in migrated], f, indent=2)
else:
json.dump(migrated.model_dump(), f, indent=2)
print(f" Saved to {output_file}")
migrate_json_files(Path("data/v1"), Path("data/v2"))
CSV Processing¶
import csv
def migrate_csv(input_path: str, output_path: str) -> None:
"""Migrate CSV file with streaming."""
def read_csv() -> Generator[ModelData]:
"""Generator to read CSV rows."""
with open(input_path, newline='') as f:
reader = csv.DictReader(f)
for row in reader:
yield row
# Migrate with streaming
with open(output_path, "w", newline="") as f:
writer = None
row_count = 0
for migrated in manager.migrate_batch_streaming(
read_csv(),
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
# Initialize writer with field names from first record
if writer is None:
fieldnames = list(migrated.model_dump().keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
# Write migrated row
writer.writerow(migrated.model_dump())
row_count += 1
if row_count % 10000 == 0:
print(f"Processed {row_count} rows...")
print(f"Complete! Migrated {row_count} rows")
migrate_csv("users_v1.csv", "users_v2.csv")
Progress Tracking¶
from tqdm import tqdm
def migrate_with_progress(
data: Iterable[ModelData], desc: str = "Migrating"
) -> list[UserV2]:
"""Migrate with progress bar."""
results = []
# Convert to list to get length for progress bar
data_list = list(data)
for record in tqdm(data_list, desc=desc):
migrated = manager.migrate(record, "User", "1.0.0", "2.0.0")
results.append(migrated)
return results
# Or with streaming
def migrate_streaming_with_progress(
data: Iterable[ModelData], total: int, desc: str = "Migrating"
) -> list[UserV2]:
"""Migrate streaming data with progress bar."""
results = []
with tqdm(total=total, desc=desc) as pbar:
for migrated in manager.migrate_batch_streaming(
data,
"User",
"1.0.0",
"2.0.0",
chunk_size=1000
):
results.append(migrated)
pbar.update(1)
return results
Performance Tips¶
- Use streaming for large datasets - Keeps memory constant
- Benchmark before parallelizing - Overhead can exceed benefits
- Use processes for CPU-bound - Threads for I/O-bound
- Tune chunk size - Larger chunks = less overhead, more memory
- Profile migrations - Identify bottlenecks in migration functions
- Consider raw dictionaries - Skip validation if not needed
- Batch database operations - Don't commit after every record
Monitoring and Logging¶
Add instrumentation to track migration performance:
import logging
import time
logger = logging.getLogger(__name__)
def migrate_with_metrics(
data: Iterable[ModelData],
model_name: str,
from_version: str,
to_version: str
) -> list[BaseModel]:
"""Migrate with detailed metrics."""
start_time = time.time()
record_count = 0
error_count = 0
try:
results = []
for record in data:
try:
migrated = manager.migrate(
record,
model_name,
from_version,
to_version
)
results.append(migrated)
record_count += 1
except Exception as e:
error_count += 1
logger.error(
f"Migration error",
extra={
"model": model_name,
"from_version": from_version,
"to_version": to_version,
"record": record,
"error": str(e)
}
)
duration = time.time() - start_time
throughput = record_count / duration if duration > 0 else 0
logger.info(
f"Migration complete",
extra={
"model": model_name,
"from_version": from_version,
"to_version": to_version,
"records": record_count,
"errors": error_count,
"duration_seconds": duration,
"throughput": throughput
}
)
return results
except Exception as e:
logger.exception("Migration failed catastrophically")
raise
Best Practices¶
- Test with production data samples - Before bulk migration
- Start with small batches - Verify correctness before scaling
- Monitor memory usage - Use streaming if memory grows
- Log failures - Track which records failed and why
- Have a rollback plan - Backup data before bulk migrations
- Measure performance - Benchmark different approaches
- Use type-safe variants - When you need type checking
Next Steps¶
Now that you understand batch processing:
Continue learning:
- Nested Models - Batch migrate data with nested models
- Discriminated Unions - Batch migrate polymorphic data
- Schema Generation - Generate schemas for batch operations
Related topics:
- Writing Migrations - Optimize migration functions for performance
- Testing Migrations - Test batch processing scenarios
API Reference:
ModelManagerAPI - CompleteModelManagerdetails- Exceptions - Exceptions pyrmute raises
- Types - Type alises exported by pyrmute