Using Apache Avro Data Files with CDH

Apache Avro is a serialization system. Avro supports rich data structures, a compact binary encoding, and a container file for sequences of Avro data (often referred to as Avro data files). Avro is language-independent and there are several language bindings for it, including Java, C, C++, Python, and Ruby.

Avro data files have the .avro extension. Make sure the files you create have this extension, because some tools use it to determine which files to process as Avro (for example, AvroInputFormat and AvroAsTextInputFormat for MapReduce and streaming).

Avro does not rely on generated code, so processing data imported from Flume or Sqoop 1 is simpler than using Hadoop Writables in SequenceFiles, where you must ensure that the generated classes are on the processing job classpath. Pig and Hive cannot easily process SequenceFiles with custom Writables, so users often revert to using text, which has disadvantages in compactness and compressibility. Generally, you cannot split compressed text, which makes it difficult to process efficiently using MapReduce.

All components in CDH that produce or consume files support Avro data files.

Compression for Avro Data Files

By default Avro data files are not compressed, but Cloudera recommends enabling compression to reduce disk usage and increase read and write performance. Avro data files support Deflate and Snappy compression. Snappy is faster, but Deflate is slightly more compact.

You do not need to specify configuration to read a compressed Avro data file. However, to write an Avro data file, you must specify the type of compression. How you specify compression depends on the component.

Using Avro Data Files in Flume

The HDFSEventSink used to serialize event data onto HDFS supports plug-in implementations of theEventSerializer interface. Implementations of this interface have full control over the serialization format and can be used in cases where the default serialization format provided by the sink is insufficient.

An abstract implementation of the EventSerializer interface, called AbstractAvroEventSerializer, is provided with Flume. This class can be extended to support custom schemas for Avro serialization over HDFS. The FlumeEventAvroEventSerializer class provides a simple implementation that maps the events to a representation of a String header map and byte payload in Avro. Use this class by setting the serializer property of the sink as follows:

agent-name.sinks.sink-name.serializer = AVRO_EVENT

Using Avro Data Files in Hive

The following example demonstrates how to create a Hive table backed by Avro data files:

CREATE TABLE doctors
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.literal'='{
  "namespace": "testing.hive.avro.serde",
  "name": "doctors",
  "type": "record",
  "fields": [
    {
      "name":"number",
      "type":"int",
      "doc":"Order of playing the role"
    },
    {
      "name":"first_name",
      "type":"string",
      "doc":"first name of actor playing role"
    },
    {
      "name":"last_name",
      "type":"string",
      "doc":"last name of actor playing role"
    },
    {
      "name":"extra_field",
      "type":"string",
      "doc:":"an extra field not in the original file",
      "default":"fishfingers and custard"
    }
  ]
}');

LOAD DATA LOCAL INPATH '/usr/share/doc/hive-0.7.1+42.55/examples/files/doctors.avro' INTO TABLE doctors;

You can also create an Avro backed Hive table by using an Avro schema file:

CREATE TABLE my_avro_table(notused INT)
  ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  WITH SERDEPROPERTIES (
    'avro.schema.url'='file:///tmp/schema.avsc')
  STORED as INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat';

avro.schema.url is a URL (here a file:// URL) pointing to an Avro schema file used for reading and writing. It could also be an hdfs: URL; for example, hdfs://hadoop-namenode-uri/examplefile.

To enable Snappy compression on output files, run the following before writing to the table:

SET hive.exec.compress.output=true;
SET avro.output.codec=snappy;

Also include the snappy-java JAR in --auxpath, which is located at /usr/lib/hive/lib/snappy-java-1.0.4.1.jar or /opt/cloudera/parcels/CDH/lib/hive/lib/snappy-java-1.0.4.1.jar.

Haivvreo SerDe has been merged into Hive as AvroSerDe and is no longer supported in its original form. schema.url and schema.literal have been changed to avro.schema.url and avro.schema.literal as a result of the merge. If you were using Haivvreo SerDe, you can use the Hive AvroSerDe with tables created with the Haivvreo SerDe. For example, if you have a table my_avro_table that uses the Haivvreo SerDe, add the following to make the table use the new AvroSerDe:

ALTER TABLE my_avro_table SET SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe';

ALTER TABLE my_avro_table SET FILEFORMAT
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat';

Using Avro Data Files in MapReduce

The Avro MapReduce API is an Avro module for running MapReduce programs that produce or consume Avro data files.

If you are using Maven, add the following dependency to your POM:

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.2-cdh6.0.0</version>
    <version>1.7.6-cdh5.12.0</version>
    <classifier>hadoop2</classifier>
</dependency>

Then write your program, using the Avro MapReduce javadoc for guidance.

At run time, include the avro and avro-mapred JARs in the HADOOP_CLASSPATH and the avro, avro-mapred and paranamer JARs in -libjars.

To enable Snappy compression on output, call AvroJob.setOutputCodec(job, "snappy") when configuring the job. You must also include the snappy-java JAR in -libjars.

Using Avro Data Files in Pig

CDH provides AvroStorage for Avro integration in Pig.

To use it, first register the piggybank JAR file. This file is located in different places depending on whether you are installing with parcels or packages. For parcel installations, use the following REGISTER command:

REGISTER /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar
For package installations, use the following REGISTER command:
REGISTER /usr/lib/pig/piggybank.jar

Then load Avro data files as follows:

a = LOAD 'my_file.avro' USING org.apache.pig.piggybank.storage.avro.AvroStorage();

Pig maps the Avro schema to a corresponding Pig schema.

You can store data in Avro data files with:

store b into 'output' USING org.apache.pig.piggybank.storage.avro.AvroStorage();

With store, Pig generates an Avro schema from the Pig schema. You can override the Avro schema by specifying it literally as a parameter to AvroStorage or by using the same schema as an existing Avro data file. See the Pig javadoc for details.

To store two relations in one script, specify an index to each store function. For example:

set1 = load 'input1.txt' using PigStorage() as ( ... );
store set1 into 'set1' using org.apache.pig.piggybank.storage.avro.AvroStorage('index', '1');

set2 = load 'input2.txt' using PigStorage() as ( ... );
store set2 into 'set2' using org.apache.pig.piggybank.storage.avro.AvroStorage('index', '2');

For more information, search for "index" in the AvroStorage wiki.

To enable Snappy compression on output files, do the following before issuing the STORE statement:

SET mapred.output.compress true
SET mapred.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec
SET avro.output.codec snappy

For more information, see the Pig wiki. The version numbers of the JAR files to register are different on that page, so adjust them as shown above.

Importing Avro Data Files in Sqoop 1

On the command line, use the following option to import Avro data files:

--as-avrodatafile

Sqoop 1 automatically generates an Avro schema that corresponds to the database table being exported from.

To enable Snappy compression, add the following option:

--compression-codec snappy

Using Avro Data Files in Spark

Using Avro Data Files in Streaming Programs

To read from Avro data files from a streaming program, specify org.apache.avro.mapred.AvroAsTextInputFormat as the input format. This format converts each datum in the Avro data file to a string. For a "bytes" schema, this is the raw bytes; in general cases, this is a single-line JSON representation.

To write to Avro data files from a streaming program, specify org.apache.avro.mapred.AvroTextOutputFormat as the output format. This format creates Avro data files with a "bytes" schema, where each datum is a tab-delimited key-value pair.

At run time, specify the avro, avro-mapred, and paranamer JARs in -libjars in the streaming command.

To enable Snappy compression on output files, set the property avro.output.codec to snappy. You must also include the snappy-java JAR in -libjars.