Skip to content

Avro Schema Generator

Here's the reference information for the included Avro schema generator classes with all their parameters, attributes and methods.

You can import these classes directly from pyrmute:

from pyrmute import AvroSchemaGenerator, AvroExporter

pyrmute.AvroSchemaGenerator

AvroSchemaGenerator(
    namespace="com.example", include_docs=True
)

Bases: SchemaGeneratorBase[AvroSchemaDocument]

Generates Apache Avro schemas from Pydantic models.

Initialize the Avro schema generator.

PARAMETER DESCRIPTION
namespace

Avro namespace for generated schemas (e.g., "com.mycompany.events").

TYPE: str DEFAULT: 'com.example'

include_docs

Whether to include field descriptions in schemas.

TYPE: bool DEFAULT: True

Source code in src/pyrmute/avro_schema.py
def __init__(
    self: Self,
    namespace: str = "com.example",
    include_docs: bool = True,
) -> None:
    """Initialize the Avro schema generator.

    Args:
        namespace: Avro namespace for generated schemas (e.g.,
            "com.mycompany.events").
        include_docs: Whether to include field descriptions in schemas.
    """
    super().__init__(include_docs=include_docs)
    self.namespace = namespace
    self._generated_enum_schemas: dict[str, CachedAvroEnumSchema] = {}

include_docs instance-attribute

include_docs = include_docs

AVRO_SYMBOL_REGEX class-attribute instance-attribute

AVRO_SYMBOL_REGEX = compile('[A-Za-z_][A-Za-z0-9_]*')

namespace instance-attribute

namespace = namespace

generate_schema

generate_schema(
    model, name, version=None, registry_name_map=None
)

Generate an Avro schema from a Pydantic model.

PARAMETER DESCRIPTION
model

Pydantic model class.

TYPE: type[BaseModel]

name

Model name.

TYPE: str

version

Optional namespace version. This is often the model version.

TYPE: str | ModelVersion | None DEFAULT: None

registry_name_map

Optional mapping of class names to registry names.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
AvroSchemaDocument

Avro schema document.

Source code in src/pyrmute/avro_schema.py
def generate_schema(
    self: Self,
    model: type[BaseModel],
    name: str,
    version: str | ModelVersion | None = None,
    registry_name_map: dict[str, str] | None = None,
) -> AvroSchemaDocument:
    """Generate an Avro schema from a Pydantic model.

    Args:
        model: Pydantic model class.
        name: Model name.
        version: Optional namespace version. This is often the model
            version.
        registry_name_map: Optional mapping of class names to registry names.

    Returns:
        Avro schema document.
    """
    self._reset_state()

    self._register_model_name(model.__name__, name)
    self._current_model_class_name = model.__name__
    self._current_model_schema_name = name
    self._types_seen.add(model.__name__)

    full_namespace = self.namespace
    if version:
        version_str = str(version).replace(".", "_")
        full_namespace = f"{self.namespace}.v{version_str}"

    if TypeInspector.is_root_model(model):
        return self._generate_root_model_schema(model, name, full_namespace)

    self._collect_nested_models(model)
    for nested_class_name in self._nested_models:
        if nested_class_name != model.__name__:
            self._register_model_name(nested_class_name, nested_class_name)

    schema: AvroRecordSchema = {
        "type": "record",
        "name": name,
        "namespace": full_namespace,
        "fields": [],
    }

    if self.include_docs and model.__doc__:
        schema["doc"] = model.__doc__.strip()

    for field_name, field_info in model.model_fields.items():
        field_schema = self._generate_field_schema(field_name, field_info, model)
        schema["fields"].append(field_schema)

    return AvroSchemaDocument(
        main=schema,
        namespace=full_namespace,
        enums={k: v["schema"] for k, v in self._generated_enum_schemas.items()},
    )

pyrmute.AvroExporter

AvroExporter(
    registry, namespace="com.example", include_docs=True
)

Export Pydantic models to Avro schema files.

This class provides methods to export individual schemas or all schemas from a model _registry to .avsc (Avro Schema) files.

Initialize the Avro exporter.

PARAMETER DESCRIPTION
registry

Model registry instance.

TYPE: Registry

namespace

Avro namespace for schemas.

TYPE: str DEFAULT: 'com.example'

include_docs

Whether to include documentation.

TYPE: bool DEFAULT: True

Source code in src/pyrmute/avro_schema.py
def __init__(
    self: Self,
    registry: Registry,
    namespace: str = "com.example",
    include_docs: bool = True,
) -> None:
    """Initialize the Avro exporter.

    Args:
        registry: Model registry instance.
        namespace: Avro namespace for schemas.
        include_docs: Whether to include documentation.
    """
    self._registry = registry
    self.generator = AvroSchemaGenerator(
        namespace=namespace,
        include_docs=include_docs,
    )

generator instance-attribute

generator = AvroSchemaGenerator(
    namespace=namespace, include_docs=include_docs
)

export_schema

export_schema(
    name,
    version,
    output_path=None,
    versioned_namespace=False,
)

Export a single model version as an Avro schema.

PARAMETER DESCRIPTION
name

Model name.

TYPE: str

version

Model version.

TYPE: str | ModelVersion

output_path

Optional file path to save schema.

TYPE: str | Path | None DEFAULT: None

versioned_namespace

Include model version in namespace. Default False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
AvroRecordSchema

Avro record schema.

Example
exporter = AvroExporter(manager._registry, namespace="com.myapp")

# Export and save
schema = exporter.export_schema("User", "1.0.0", "schemas/user_v1.avsc")

# Or just get the schema
schema = exporter.export_schema("User", "1.0.0", versioned_namespace=True)
print(json.dumps(schema, indent=2))
Source code in src/pyrmute/avro_schema.py
def export_schema(
    self: Self,
    name: str,
    version: str | ModelVersion,
    output_path: str | Path | None = None,
    versioned_namespace: bool = False,
) -> AvroRecordSchema:
    """Export a single model version as an Avro schema.

    Args:
        name: Model name.
        version: Model version.
        output_path: Optional file path to save schema.
        versioned_namespace: Include model version in namespace. Default False.

    Returns:
        Avro record schema.

    Example:
        ```python
        exporter = AvroExporter(manager._registry, namespace="com.myapp")

        # Export and save
        schema = exporter.export_schema("User", "1.0.0", "schemas/user_v1.avsc")

        # Or just get the schema
        schema = exporter.export_schema("User", "1.0.0", versioned_namespace=True)
        print(json.dumps(schema, indent=2))
        ```
    """
    model = self._registry.get_model(name, version)
    document = (
        self.generator.generate_schema(model, name, version)
        if versioned_namespace
        else self.generator.generate_schema(model, name)
    )

    if output_path:
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        output_path.write_text(document.to_string())

    return document.main

export_all_schemas

export_all_schemas(
    output_dir, indent=2, versioned_namespace=False
)

Export all registered models as Avro schemas.

PARAMETER DESCRIPTION
output_dir

Directory to save schema files.

TYPE: str | Path

indent

JSON indentation level.

TYPE: int DEFAULT: 2

versioned_namespace

Include model version in namespace. Default False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, dict[str, AvroRecordSchema]]

Dictionary mapping model names to version to schema.

Example
exporter = AvroExporter(manager._registry, namespace="com.myapp")
schemas = exporter.export_all_schemas("schemas/avro/")

# Creates files like:
# schemas/avro/User_v1_0_0.avsc
# schemas/avro/User_v2_0_0.avsc
# schemas/avro/Order_v1_0_0.avsc
Source code in src/pyrmute/avro_schema.py
def export_all_schemas(
    self: Self,
    output_dir: str | Path,
    indent: int = 2,
    versioned_namespace: bool = False,
) -> dict[str, dict[str, AvroRecordSchema]]:
    """Export all registered models as Avro schemas.

    Args:
        output_dir: Directory to save schema files.
        indent: JSON indentation level.
        versioned_namespace: Include model version in namespace. Default False.

    Returns:
        Dictionary mapping model names to version to schema.

    Example:
        ```python
        exporter = AvroExporter(manager._registry, namespace="com.myapp")
        schemas = exporter.export_all_schemas("schemas/avro/")

        # Creates files like:
        # schemas/avro/User_v1_0_0.avsc
        # schemas/avro/User_v2_0_0.avsc
        # schemas/avro/Order_v1_0_0.avsc
        ```
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    all_schemas: dict[str, dict[str, AvroRecordSchema]] = {}

    for model_name in self._registry.list_models():
        all_schemas[model_name] = {}
        versions = self._registry.get_versions(model_name)

        for version in versions:
            model = self._registry.get_model(model_name, version)
            document = (
                self.generator.generate_schema(model, model_name, version)
                if versioned_namespace
                else self.generator.generate_schema(model, model_name)
            )

            version_str = str(version).replace(".", "_")
            filename = f"{model_name}_v{version_str}.avsc"
            filepath = output_dir / filename

            filepath.write_text(document.to_string(indent=indent))

            all_schemas[model_name][str(version)] = document.main

    return all_schemas