Multiple avro output files with Hadoop streaming api

Multiple avro output files with Hadoop streaming api

Mappers and reducers using Hadoop streaming api generally have no direct control on how many output files to generate in Hadoop Distributed File System (HDFS). Although it is possible to open Hadoop subprocesses to write to HDFS during mapping and reducing, it may be easier and safer to write an output format in Java that outputs multiple files. Here is an example output format that outputs multiple avro files.

  1. /**
  2.  * each line in the standard output stream consists of a key and a value
  3.  * separated by the first tab character. they both are of the type
  4.  * org.apache.hadoop.io.Text. this output format assumes that the value
  5.  * is the json representation of an avro record, and the key is the name
  6.  * of the output file the record is appended to.
  7.  */
  8. public class MultipleAvroOutputFormat extends MultipleOutputFormat<Text, Text> {
  9.  
  10.   // mapping from schema name to schema object.
  11.   private static final Map<String, Schema> SCHEMATA;
  12.   static {
  13.     // populate SCHEMATA from resources using org.apache.avro.Schema.Parser.
  14.   }
  15.  
  16.   @Override
  17.   protected String generateFileNameForKeyValue(
  18.       Text key, Text value, String name) {
  19.     // suppose that the hash partitioner is used, then the same key goes
  20.     // to the same mapper/reducer, i.e., different mappers/reducers have
  21.     // different keys. thus using keys as the output filenames will not result
  22.     // in conflicts among mappers/reducers.
  23.     return key.toString();
  24.   }
  25.  
  26.   static <T> void configureDataFileWriter(DataFileWriter<T> writer, JobConf job)
  27.       throws UnsupportedEncodingException {
  28.     // copy the same method and its related methods from
  29.     // org.apache.avro.mapred.AvroOutputFormat.
  30.   }
  31.  
  32.   @Override
  33.   protected RecordWriter<Text, Text> getBaseRecordWriter(
  34.       final FileSystem fs, final JobConf conf, final String name
  35.       , final Progressable progressable) throws IOException {
  36.     final boolean mapOnly = 0 == conf.getNumReduceTasks();
  37.     // the value of MAP_OUTPUT_SCHEMA or OUTPUT_SCHEMA
  38.     // in the configuration corresponds to keys in SCHEMATA.
  39.     final String avsc = mapOnly
  40.         ? conf.get(AvroJob.MAP_OUTPUT_SCHEMA, conf.get(AvroJob.OUTPUT_SCHEMA))
  41.         : conf.get(AvroJob.OUTPUT_SCHEMA);
  42.     final Schema schema = SCHEMATA.get(avsc);
  43.     if(null == schema) {
  44.       throw new IOException("invalid schema name on "
  45.           + AvroJob.MAP_OUTPUT_SCHEMA
  46.           + " and/or " + AvroJob.OUTPUT_SCHEMA + ".");
  47.     }
  48.     final GenericData model = AvroJob.createDataModel(conf);
  49.     final DataFileWriter<GenericRecord> writer =
  50.         new DataFileWriter<GenericRecord>(
  51.           new GenericDatumWriter<GenericRecord>(schema, model));
  52.     configureDataFileWriter(writer, conf);
  53.     final Path path = FileOutputFormat.getTaskOutputPath(
  54.         conf, name + AvroOutputFormat.EXT);
  55.     writer.create(schema, path.getFileSystem(conf).create(path));
  56.     return new RecordWriter<Text, Text>() {
  57.  
  58.       @Override
  59.       public void close(Reporter reporter) throws IOException {
  60.         writer.close();
  61.       }
  62.  
  63.       @Override
  64.       public void write(Text key, Text value) throws IOException {
  65.         final String json = value.toString();
  66.         final Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json);
  67.         final DatumReader<GenericRecord> reader =
  68.             new GenericDatumReader<GenericRecord>(schema);
  69.         final GenericRecord record = reader.read(null, decoder);
  70.         writer.append(record);
  71.       }
  72.     };
  73.   }
  74. }

so the MultipleAvroOutputFormat extends from org.apache.hadoop.mapred.lib.MultipleOutputFormat and overrides the key generation scheme. Note that the MultipleOutputFormat keeps all the output files open till the very end. It is not a good idea to use too many different keys and lead to too many output files being opened at the same time.

Post new comment

The content of this field is kept private and will not be shown publicly.
  • Web page addresses and e-mail addresses turn into links automatically.
  • Lines and paragraphs break automatically.

More information about formatting options

To prevent automated spam submissions leave this field empty.