from_avro

Converts a binary column of Avro format into its corresponding catalyst value. The specified schema must match the read data, otherwise the behavior is undefined: it may fail or return an arbitrary result.

If jsonFormatSchema is not provided but both subject and schemaRegistryAddress are provided, the function converts a binary column of Schema Registry Avro format into its corresponding catalyst value.

Syntax

from pyspark.sql.avro.functions import from_avro

from_avro(data, jsonFormatSchema=None, options=None, subject=None, schemaRegistryAddress=None)

Parameters

Parameter Type Description
data pyspark.sql.Column or str The binary column containing Avro-encoded data.
jsonFormatSchema str, optional The Avro schema in JSON string format.
options dict, optional Options to control how the Avro record is parsed and configuration for the schema registry client.
subject str, optional The subject in Schema Registry that the data belongs to.
schemaRegistryAddress str, optional The address (host and port) of the Schema Registry.

Options

Option Values Description
mode FAILFAST, PERMISSIVE Error handling mode. Default: FAILFAST. In PERMISSIVE mode, corrupt records are set to NULL instead of raising an error.
compression uncompressed, snappy, deflate, bzip2, xz, zstandard Compression codec for encoding Avro data.
avroSchemaEvolutionMode none, restart Schema evolution mode. Default: none. When set to restart, the query throws an UnknownFieldException when the schema changes. Restart the job to use the new schema. See Use schema evolution mode with from_avro.
recursiveFieldMaxDepth Range: -1 to 15 Maximum recursion depth along a single recursive path. Default: -1, which does not limit recursion depth.
When a shared type is reachable from many distinct schema paths, schema expansion might cause the driver to run out of memory because this option bounds depth on one path only. To workaround:

Returns

pyspark.sql.Column: A new column containing the deserialized Avro data as the corresponding catalyst value.

Examples

Example 1: Deserializing an Avro binary column using a JSON schema

from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro

data = [(1, Row(age=2, name='Alice'))]
df = spark.createDataFrame(data, ("key", "value"))
avro_df = df.select(to_avro(df.value).alias("avro"))
json_format_schema = '''{"type":"record","name":"topLevelRecord","fields":
    [{"name":"avro","type":[{"type":"record","name":"value",
    "namespace":"topLevelRecord","fields":[{"name":"age","type":["long","null"]},
    {"name":"name","type":["string","null"]}]},"null"]}]}'''
avro_df.select(from_avro(avro_df.avro, json_format_schema).alias("value")).show(truncate=False)
+------------------+
|value             |
+------------------+
|{{2, Alice}}      |
+------------------+