HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)

(cherry picked from commit 295d678be8)
(cherry picked from commit 80697e4f32)
(cherry picked from commit b3546b6034)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt

(cherry picked from commit e9c8d8c58516aa64589cd44e9e5dd0a23ba72a17)
This commit is contained in:
Vinayakumar B 2015-06-18 14:39:00 +05:30 committed by Vinod Kumar Vavilapalli
parent 193d8d3667
commit 4f53c98ca4
3 changed files with 394 additions and 5 deletions

View File

@ -8,6 +8,9 @@ Release 2.6.1 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
HADOOP-7139. Allow appending to existing SequenceFiles
(kanaka kumar avvaru via vinayakumarb)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11238. Update the NameNode's Group Cache in the background when HADOOP-11238. Update the NameNode's Group Cache in the background when

View File

@ -836,6 +836,8 @@ public class SequenceFile {
Metadata metadata = null; Metadata metadata = null;
Compressor compressor = null; Compressor compressor = null;
private boolean appendMode = false;
protected Serializer keySerializer; protected Serializer keySerializer;
protected Serializer uncompressedValSerializer; protected Serializer uncompressedValSerializer;
protected Serializer compressedValSerializer; protected Serializer compressedValSerializer;
@ -907,6 +909,13 @@ public class SequenceFile {
} }
} }
static class AppendIfExistsOption extends Options.BooleanOption implements
Option {
AppendIfExistsOption(boolean value) {
super(value);
}
}
static class KeyClassOption extends Options.ClassOption implements Option { static class KeyClassOption extends Options.ClassOption implements Option {
KeyClassOption(Class<?> value) { KeyClassOption(Class<?> value) {
super(value); super(value);
@ -982,6 +991,10 @@ public class SequenceFile {
return new ReplicationOption(value); return new ReplicationOption(value);
} }
public static Option appendIfExists(boolean value) {
return new AppendIfExistsOption(value);
}
public static Option blockSize(long value) { public static Option blockSize(long value) {
return new BlockSizeOption(value); return new BlockSizeOption(value);
} }
@ -1028,6 +1041,8 @@ public class SequenceFile {
ProgressableOption progressOption = ProgressableOption progressOption =
Options.getOption(ProgressableOption.class, opts); Options.getOption(ProgressableOption.class, opts);
FileOption fileOption = Options.getOption(FileOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts);
AppendIfExistsOption appendIfExistsOption = Options.getOption(
AppendIfExistsOption.class, opts);
FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
StreamOption streamOption = Options.getOption(StreamOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts);
KeyClassOption keyClassOption = KeyClassOption keyClassOption =
@ -1069,7 +1084,54 @@ public class SequenceFile {
blockSizeOption.getValue(); blockSizeOption.getValue();
Progressable progress = progressOption == null ? null : Progressable progress = progressOption == null ? null :
progressOption.getValue(); progressOption.getValue();
out = fs.create(p, true, bufferSize, replication, blockSize, progress);
if (appendIfExistsOption != null && appendIfExistsOption.getValue()
&& fs.exists(p)) {
// Read the file and verify header details
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
try {
if (keyClassOption.getValue() != reader.getKeyClass()
|| valueClassOption.getValue() != reader.getValueClass()) {
throw new IllegalArgumentException(
"Key/value class provided does not match the file");
}
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3],
reader.getVersion());
}
if (metadataOption != null) {
LOG.info("MetaData Option is ignored during append");
}
metadataOption = (MetadataOption) SequenceFile.Writer
.metadata(reader.getMetadata());
CompressionOption readerCompressionOption = new CompressionOption(
reader.getCompressionType(), reader.getCompressionCodec());
if (readerCompressionOption.value != compressionTypeOption.value
|| !readerCompressionOption.codec.getClass().getName()
.equals(compressionTypeOption.codec.getClass().getName())) {
throw new IllegalArgumentException(
"Compression option provided does not match the file");
}
sync = reader.getSync();
} finally {
reader.close();
}
out = fs.append(p, bufferSize, progress);
this.appendMode = true;
} else {
out = fs
.create(p, true, bufferSize, replication, blockSize, progress);
}
} else { } else {
out = streamOption.getValue(); out = streamOption.getValue();
} }
@ -1212,7 +1274,12 @@ public class SequenceFile {
} }
this.compressedValSerializer.open(deflateOut); this.compressedValSerializer.open(deflateOut);
} }
writeFileHeader();
if (appendMode) {
sync();
} else {
writeFileHeader();
}
} }
/** Returns the class of keys in this file. */ /** Returns the class of keys in this file. */
@ -2043,6 +2110,14 @@ public class SequenceFile {
/** Returns the compression codec of data in this file. */ /** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; } public CompressionCodec getCompressionCodec() { return codec; }
private byte[] getSync() {
return sync;
}
private byte getVersion() {
return version;
}
/** /**
* Get the compression type for this file. * Get the compression type for this file.
* @return the compression type * @return the compression type

View File

@ -0,0 +1,311 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.Option;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSequenceFileAppend {
private static Configuration conf;
private static FileSystem fs;
private static Path ROOT_PATH = new Path(System.getProperty(
"test.build.data", "build/test/data"));
@BeforeClass
public static void setUp() throws Exception {
conf = new Configuration();
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization");
conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
fs = FileSystem.get(conf);
}
@AfterClass
public static void tearDown() throws Exception {
fs.close();
}
@Test(timeout = 30000)
public void testAppend() throws Exception {
Path file = new Path(ROOT_PATH, "testseqappend.seq");
fs.delete(file, true);
Text key1 = new Text("Key1");
Text value1 = new Text("Value1");
Text value2 = new Text("Updated");
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
metadata.set(key1, value1);
Writer.Option metadataOption = Writer.metadata(metadata);
Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class), metadataOption);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
verify2Values(file);
metadata.set(key1, value2);
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), metadataOption);
// Verify the Meta data is not changed
assertEquals(value1, writer.metadata.get(key1));
writer.append(3L, "three");
writer.append(4L, "four");
writer.close();
verifyAll4Values(file);
// Verify the Meta data readable after append
Reader reader = new Reader(conf, Reader.file(file));
assertEquals(value1, reader.getMetadata().get(key1));
reader.close();
// Verify failure if the compression details are different
try {
Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
new GzipCodec());
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
writer.close();
fail("Expected IllegalArgumentException for compression options");
} catch (IllegalArgumentException IAE) {
// Expected exception. Ignore it
}
try {
Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
new DefaultCodec());
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
writer.close();
fail("Expected IllegalArgumentException for compression options");
} catch (IllegalArgumentException IAE) {
// Expected exception. Ignore it
}
fs.deleteOnExit(file);
}
@Test(timeout = 30000)
public void testAppendRecordCompression() throws Exception {
Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
fs.delete(file, true);
Option compressOption = Writer.compression(CompressionType.RECORD,
new GzipCodec());
Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class), compressOption);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
verify2Values(file);
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), compressOption);
writer.append(3L, "three");
writer.append(4L, "four");
writer.close();
verifyAll4Values(file);
fs.deleteOnExit(file);
}
@Test(timeout = 30000)
public void testAppendBlockCompression() throws Exception {
Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
fs.delete(file, true);
Option compressOption = Writer.compression(CompressionType.BLOCK,
new GzipCodec());
Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class), compressOption);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
verify2Values(file);
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), compressOption);
writer.append(3L, "three");
writer.append(4L, "four");
writer.close();
verifyAll4Values(file);
// Verify failure if the compression details are different or not Provided
try {
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true));
writer.close();
fail("Expected IllegalArgumentException for compression options");
} catch (IllegalArgumentException IAE) {
// Expected exception. Ignore it
}
// Verify failure if the compression details are different
try {
Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
new GzipCodec());
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
writer.close();
fail("Expected IllegalArgumentException for compression options");
} catch (IllegalArgumentException IAE) {
// Expected exception. Ignore it
}
try {
Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
new DefaultCodec());
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
writer.close();
fail("Expected IllegalArgumentException for compression options");
} catch (IllegalArgumentException IAE) {
// Expected exception. Ignore it
}
fs.deleteOnExit(file);
}
@Test(timeout = 30000)
public void testAppendSort() throws Exception {
Path file = new Path(ROOT_PATH, "testseqappendSort.seq");
fs.delete(file, true);
Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort");
fs.delete(sortedFile, true);
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
new JavaSerializationComparator<Long>(), Long.class, String.class, conf);
Option compressOption = Writer.compression(CompressionType.BLOCK,
new GzipCodec());
Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class), compressOption);
writer.append(2L, "two");
writer.append(1L, "one");
writer.close();
writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class),
SequenceFile.Writer.appendIfExists(true), compressOption);
writer.append(4L, "four");
writer.append(3L, "three");
writer.close();
// Sort file after append
sorter.sort(file, sortedFile);
verifyAll4Values(sortedFile);
fs.deleteOnExit(file);
fs.deleteOnExit(sortedFile);
}
private void verify2Values(Path file) throws IOException {
Reader reader = new Reader(conf, Reader.file(file));
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
private void verifyAll4Values(Path file) throws IOException {
Reader reader = new Reader(conf, Reader.file(file));
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertEquals(3L, reader.next((Object) null));
assertEquals("three", reader.getCurrentValue((Object) null));
assertEquals(4L, reader.next((Object) null));
assertEquals("four", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
}