From 5001565459796698a1038cfbff31f4e0d56644c4 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Wed, 6 Oct 2010 06:16:31 +0000 Subject: [PATCH] HADOOP-6984. Combine the compress kind and the codec in the same option for SequenceFiles. (cdouglas via omalley) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1004900 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + src/java/org/apache/hadoop/io/ArrayFile.java | 2 +- .../org/apache/hadoop/io/BloomMapFile.java | 15 ++- src/java/org/apache/hadoop/io/MapFile.java | 33 +++-- .../org/apache/hadoop/io/SequenceFile.java | 113 ++++++++---------- src/java/org/apache/hadoop/io/SetFile.java | 2 +- 6 files changed, 75 insertions(+), 93 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6918b894f38..fbb766fb4c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -253,6 +253,9 @@ Trunk (unreleased changes) HADOOP-6989. Correct the parameter for SetFile to set the value type for SetFile to be NullWritable instead of the key. (cdouglas via omalley) + HADOOP-6984. Combine the compress kind and the codec in the same option + for SequenceFiles. (cdouglas via omalley) + Release 0.21.1 - Unreleased IMPROVEMENTS diff --git a/src/java/org/apache/hadoop/io/ArrayFile.java b/src/java/org/apache/hadoop/io/ArrayFile.java index a0ab2422ba6..bee5fd2cb43 100644 --- a/src/java/org/apache/hadoop/io/ArrayFile.java +++ b/src/java/org/apache/hadoop/io/ArrayFile.java @@ -54,7 +54,7 @@ public Writer(Configuration conf, FileSystem fs, super(conf, new Path(file), keyClass(LongWritable.class), valueClass(valClass), - compressionType(compress), + compression(compress), progressable(progress)); } diff --git a/src/java/org/apache/hadoop/io/BloomMapFile.java b/src/java/org/apache/hadoop/io/BloomMapFile.java index d1431e04771..ab68ce7f60b 100644 --- a/src/java/org/apache/hadoop/io/BloomMapFile.java +++ b/src/java/org/apache/hadoop/io/BloomMapFile.java @@ -89,8 +89,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, Class valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), - compressionType(compress), compressionCodec(codec), - progressable(progress)); + compression(compress, codec), progressable(progress)); } @Deprecated @@ -99,7 +98,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, Class valClass, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), - compressionType(compress), progressable(progress)); + compression(compress), progressable(progress)); } @Deprecated @@ -108,7 +107,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), - compressionType(compress)); + compression(compress)); } @Deprecated @@ -117,8 +116,8 @@ public Writer(Configuration conf, FileSystem fs, String dirName, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress), - compressionCodec(codec), progressable(progress)); + valueClass(valClass), compression(compress, codec), + progressable(progress)); } @Deprecated @@ -126,7 +125,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress), + valueClass(valClass), compression(compress), progressable(progress)); } @@ -135,7 +134,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress)); + valueClass(valClass), compression(compress)); } @Deprecated diff --git a/src/java/org/apache/hadoop/io/MapFile.java b/src/java/org/apache/hadoop/io/MapFile.java index 105b62763bd..ad36730a136 100644 --- a/src/java/org/apache/hadoop/io/MapFile.java +++ b/src/java/org/apache/hadoop/io/MapFile.java @@ -113,7 +113,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), - compressionType(compress), progressable(progress)); + compression(compress), progressable(progress)); } /** Create the named map for keys of the named class. @@ -125,8 +125,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), - compressionType(compress), compressionCodec(codec), - progressable(progress)); + compression(compress, codec), progressable(progress)); } /** Create the named map for keys of the named class. @@ -137,7 +136,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), - valueClass(valClass), compressionType(compress)); + valueClass(valClass), compression(compress)); } /** Create the named map using the named key comparator. @@ -159,7 +158,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress)); + valueClass(valClass), compression(compress)); } /** Create the named map using the named key comparator. @@ -171,7 +170,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, SequenceFile.CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress), + valueClass(valClass), compression(compress), progressable(progress)); } @@ -184,8 +183,8 @@ public Writer(Configuration conf, FileSystem fs, String dirName, SequenceFile.CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), - valueClass(valClass), compressionType(compress), - compressionCodec(codec), progressable(progress)); + valueClass(valClass), compression(compress, codec), + progressable(progress)); } // our options are a superset of sequence file writer options @@ -221,13 +220,14 @@ public static SequenceFile.Writer.Option valueClass(Class value) { } public static - SequenceFile.Writer.Option compressionType(CompressionType value) { - return SequenceFile.Writer.compressionType(value); + SequenceFile.Writer.Option compression(CompressionType type) { + return SequenceFile.Writer.compression(type); } public static - SequenceFile.Writer.Option compressionCodec(CompressionCodec value) { - return SequenceFile.Writer.compressionCodec(value); + SequenceFile.Writer.Option compression(CompressionType type, + CompressionCodec codec) { + return SequenceFile.Writer.compression(type, codec); } public static SequenceFile.Writer.Option progressable(Progressable value) { @@ -274,11 +274,10 @@ public Writer(Configuration conf, this.data = SequenceFile.createWriter(conf, dataOptions); SequenceFile.Writer.Option[] indexOptions = - Options.prependOptions(opts, - SequenceFile.Writer.file(indexFile), - SequenceFile.Writer.keyClass(keyClass), - SequenceFile.Writer.valueClass(LongWritable.class), - SequenceFile.Writer.compressionType(CompressionType.BLOCK)); + Options.prependOptions(opts, SequenceFile.Writer.file(indexFile), + SequenceFile.Writer.keyClass(keyClass), + SequenceFile.Writer.valueClass(LongWritable.class), + SequenceFile.Writer.compression(CompressionType.BLOCK)); this.index = SequenceFile.createWriter(conf, indexOptions); } diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index f66acf7532c..3197d3fcabf 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -252,22 +252,23 @@ static public void setDefaultCompressionType(Configuration job, */ public static Writer createWriter(Configuration conf, Writer.Option... opts ) throws IOException { - Writer.CompressionTypeOption compressionOption = - Options.getOption(Writer.CompressionTypeOption.class, opts); + Writer.CompressionOption compressionOption = + Options.getOption(Writer.CompressionOption.class, opts); CompressionType kind; if (compressionOption != null) { kind = compressionOption.getValue(); } else { kind = getDefaultCompressionType(conf); + opts = Options.prependOptions(opts, Writer.compression(kind)); } switch (kind) { - default: - case NONE: - return new Writer(conf, kind, opts); - case RECORD: - return new RecordCompressWriter(conf, kind, opts); - case BLOCK: - return new BlockCompressWriter(conf, kind, opts); + default: + case NONE: + return new Writer(conf, opts); + case RECORD: + return new RecordCompressWriter(conf, opts); + case BLOCK: + return new BlockCompressWriter(conf, opts); } } @@ -311,7 +312,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts CompressionType compressionType) throws IOException { return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType)); + Writer.compression(compressionType)); } /** @@ -335,7 +336,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts Progressable progress) throws IOException { return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), + Writer.compression(compressionType), Writer.progressable(progress)); } @@ -360,8 +361,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts CompressionCodec codec) throws IOException { return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec)); + Writer.compression(compressionType, codec)); } /** @@ -388,8 +388,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts Progressable progress, Metadata metadata) throws IOException { return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec), + Writer.compression(compressionType, codec), Writer.progressable(progress), Writer.metadata(metadata)); } @@ -425,8 +424,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts Writer.bufferSize(bufferSize), Writer.replication(replication), Writer.blockSize(blockSize), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec), + Writer.compression(compressionType, codec), Writer.progressable(progress), Writer.metadata(metadata)); } @@ -454,8 +452,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts Progressable progress) throws IOException { return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec), + Writer.compression(compressionType, codec), Writer.progressable(progress)); } @@ -481,8 +478,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts CompressionCodec codec, Metadata metadata) throws IOException { return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec), + Writer.compression(compressionType, codec), Writer.metadata(metadata)); } @@ -506,8 +502,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts CompressionCodec codec) throws IOException { return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec)); + Writer.compression(compressionType, codec)); } @@ -839,23 +834,23 @@ static class ProgressableOption extends Options.ProgressableOption } } - private static class CompressionTypeOption implements Option { + private static class CompressionOption implements Option { private final CompressionType value; - CompressionTypeOption(CompressionType value) { + private final CompressionCodec codec; + CompressionOption(CompressionType value) { + this(value, null); + } + CompressionOption(CompressionType value, CompressionCodec codec) { this.value = value; + this.codec = (CompressionType.NONE != value && null == codec) + ? new DefaultCodec() + : codec; } CompressionType getValue() { return value; } - } - - private static class CompressionCodecOption implements Option { - private final CompressionCodec value; - CompressionCodecOption(CompressionCodec value) { - this.value = value; - } - CompressionCodec getValue() { - return value; + CompressionCodec getCodec() { + return codec; } } @@ -895,25 +890,23 @@ public static Option metadata(Metadata value) { return new MetadataOption(value); } - public static Option compressionType(CompressionType value) { - return new CompressionTypeOption(value); - } - - public static Option compressionCodec(CompressionCodec value) { - return new CompressionCodecOption(value); + public static Option compression(CompressionType value) { + return new CompressionOption(value); } + public static Option compression(CompressionType value, + CompressionCodec codec) { + return new CompressionOption(value, codec); + } + /** * Construct a uncompressed writer from a set of options. * @param conf the configuration to use - * @param compressionType the compression type being used * @param options the options used when creating the writer * @throws IOException if it fails */ Writer(Configuration conf, - CompressionType compressionType, Option... opts) throws IOException { - this.compress = compressionType; BlockSizeOption blockSizeOption = Options.getOption(BlockSizeOption.class, opts); BufferSizeOption bufferSizeOption = @@ -928,10 +921,10 @@ public static Option compressionCodec(CompressionCodec value) { Options.getOption(KeyClassOption.class, opts); ValueClassOption valueClassOption = Options.getOption(ValueClassOption.class, opts); - CompressionCodecOption compressionCodecOption = - Options.getOption(CompressionCodecOption.class, opts); MetadataOption metadataOption = Options.getOption(MetadataOption.class, opts); + CompressionOption compressionTypeOption = + Options.getOption(CompressionOption.class, opts); // check consistency of options if ((fileOption == null) == (streamOption == null)) { throw new IllegalArgumentException("file or stream must be specified"); @@ -968,13 +961,8 @@ public static Option compressionCodec(CompressionCodec value) { Object.class : valueClassOption.getValue(); Metadata metadata = metadataOption == null ? new Metadata() : metadataOption.getValue(); - CompressionCodec codec; - if (compressionType == CompressionType.NONE) { - codec = null; - } else { - codec = compressionCodecOption == null ? - new DefaultCodec() : compressionCodecOption.getValue(); - } + this.compress = compressionTypeOption.getValue(); + final CompressionCodec codec = compressionTypeOption.getCodec(); if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && @@ -1207,9 +1195,8 @@ public synchronized long getLength() throws IOException { static class RecordCompressWriter extends Writer { RecordCompressWriter(Configuration conf, - CompressionType compressionType, Option... options) throws IOException { - super(conf, compressionType, options); + super(conf, options); } /** Append a key/value pair. */ @@ -1276,9 +1263,8 @@ static class BlockCompressWriter extends Writer { private final int compressionBlockSize; BlockCompressWriter(Configuration conf, - CompressionType compressionType, Option... options) throws IOException { - super(conf, compressionType, options); + super(conf, options); compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000); keySerializer.close(); @@ -2756,14 +2742,10 @@ private void flush(int count, int bytesProcessed, } long segmentStart = out.getPos(); - Writer writer = createWriter(conf, - Writer.stream(out), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compressionType(compressionType), - Writer.compressionCodec(codec), - Writer.metadata(done ? metadata : - new Metadata())); + Writer writer = createWriter(conf, Writer.stream(out), + Writer.keyClass(keyClass), Writer.valueClass(valClass), + Writer.compression(compressionType, codec), + Writer.metadata(done ? metadata : new Metadata())); if (!done) { writer.sync = null; // disable sync on temp files @@ -2943,8 +2925,7 @@ public Writer cloneFileAttributes(Path inputFile, Path outputFile, Writer.file(outputFile), Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compressionType(compress), - Writer.compressionCodec(codec), + Writer.compression(compress, codec), Writer.progressable(prog)); return writer; } diff --git a/src/java/org/apache/hadoop/io/SetFile.java b/src/java/org/apache/hadoop/io/SetFile.java index e4d261a387d..ed3babe95b6 100644 --- a/src/java/org/apache/hadoop/io/SetFile.java +++ b/src/java/org/apache/hadoop/io/SetFile.java @@ -60,7 +60,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName, super(conf, new Path(dirName), comparator(comparator), valueClass(NullWritable.class), - compressionType(compress)); + compression(compress)); } /** Append a key to a set. The key must be strictly greater than the