From 295d678be8853a52c3ec3da43d9265478d6632b3 Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Thu, 18 Jun 2015 14:39:00 +0530 Subject: [PATCH] HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru) --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/io/SequenceFile.java | 85 ++++- .../hadoop/io/TestSequenceFileAppend.java | 311 ++++++++++++++++++ 3 files changed, 394 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4b1b3825391..3430da62179 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -651,6 +651,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11965. determine-flaky-tests needs a summary mode. (Yufei Gu via Yongjun Zhang) + HADOOP-7139. Allow appending to existing SequenceFiles + (kanaka kumar avvaru via vinayakumarb) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 7a59149ff05..e37e855ed25 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -837,7 +837,9 @@ public class SequenceFile { DataOutputStream deflateOut = null; Metadata metadata = null; Compressor compressor = null; - + + private boolean appendMode = false; + protected Serializer keySerializer; protected Serializer uncompressedValSerializer; protected Serializer compressedValSerializer; @@ -909,6 +911,13 @@ public class SequenceFile { } } + static class AppendIfExistsOption extends Options.BooleanOption implements + Option { + AppendIfExistsOption(boolean value) { + super(value); + } + } + static class KeyClassOption extends Options.ClassOption implements Option { KeyClassOption(Class value) { super(value); @@ -958,7 +967,7 @@ public class SequenceFile { return codec; } } - + public static Option file(Path value) { return new FileOption(value); } @@ -984,6 +993,10 @@ public class SequenceFile { return new ReplicationOption(value); } + public static Option appendIfExists(boolean value) { + return new AppendIfExistsOption(value); + } + public static Option blockSize(long value) { return new BlockSizeOption(value); } @@ -1030,6 +1043,8 @@ public class SequenceFile { ProgressableOption progressOption = Options.getOption(ProgressableOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts); + AppendIfExistsOption appendIfExistsOption = Options.getOption( + AppendIfExistsOption.class, opts); FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts); KeyClassOption keyClassOption = @@ -1071,7 +1086,54 @@ public class SequenceFile { blockSizeOption.getValue(); Progressable progress = progressOption == null ? null : progressOption.getValue(); - out = fs.create(p, true, bufferSize, replication, blockSize, progress); + + if (appendIfExistsOption != null && appendIfExistsOption.getValue() + && fs.exists(p)) { + + // Read the file and verify header details + SequenceFile.Reader reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); + try { + + if (keyClassOption.getValue() != reader.getKeyClass() + || valueClassOption.getValue() != reader.getValueClass()) { + throw new IllegalArgumentException( + "Key/value class provided does not match the file"); + } + + if (reader.getVersion() != VERSION[3]) { + throw new VersionMismatchException(VERSION[3], + reader.getVersion()); + } + + if (metadataOption != null) { + LOG.info("MetaData Option is ignored during append"); + } + metadataOption = (MetadataOption) SequenceFile.Writer + .metadata(reader.getMetadata()); + + CompressionOption readerCompressionOption = new CompressionOption( + reader.getCompressionType(), reader.getCompressionCodec()); + + if (readerCompressionOption.value != compressionTypeOption.value + || !readerCompressionOption.codec.getClass().getName() + .equals(compressionTypeOption.codec.getClass().getName())) { + throw new IllegalArgumentException( + "Compression option provided does not match the file"); + } + + sync = reader.getSync(); + + } finally { + reader.close(); + } + + out = fs.append(p, bufferSize, progress); + this.appendMode = true; + } else { + out = fs + .create(p, true, bufferSize, replication, blockSize, progress); + } } else { out = streamOption.getValue(); } @@ -1159,7 +1221,7 @@ public class SequenceFile { out.write(sync); // write the sync bytes out.flush(); // flush header } - + /** Initialize. */ @SuppressWarnings("unchecked") void init(Configuration conf, FSDataOutputStream out, boolean ownStream, @@ -1214,7 +1276,12 @@ public class SequenceFile { } this.compressedValSerializer.open(deflateOut); } - writeFileHeader(); + + if (appendMode) { + sync(); + } else { + writeFileHeader(); + } } /** Returns the class of keys in this file. */ @@ -2045,6 +2112,14 @@ public class SequenceFile { /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } + private byte[] getSync() { + return sync; + } + + private byte getVersion() { + return version; + } + /** * Get the compression type for this file. * @return the compression type diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java new file mode 100644 index 00000000000..4576642c9b0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.SequenceFile.Writer.Option; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSequenceFileAppend { + + private static Configuration conf; + private static FileSystem fs; + private static Path ROOT_PATH = new Path(System.getProperty( + "test.build.data", "build/test/data")); + + @BeforeClass + public static void setUp() throws Exception { + conf = new Configuration(); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); + fs = FileSystem.get(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + fs.close(); + } + + @Test(timeout = 30000) + public void testAppend() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappend.seq"); + fs.delete(file, true); + + Text key1 = new Text("Key1"); + Text value1 = new Text("Value1"); + Text value2 = new Text("Updated"); + + SequenceFile.Metadata metadata = new SequenceFile.Metadata(); + metadata.set(key1, value1); + Writer.Option metadataOption = Writer.metadata(metadata); + + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), metadataOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + metadata.set(key1, value2); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), metadataOption); + + // Verify the Meta data is not changed + assertEquals(value1, writer.metadata.get(key1)); + + writer.append(3L, "three"); + writer.append(4L, "four"); + + writer.close(); + + verifyAll4Values(file); + + // Verify the Meta data readable after append + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(value1, reader.getMetadata().get(key1)); + reader.close(); + + // Verify failure if the compression details are different + try { + Option wrongCompressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + try { + Option wrongCompressOption = Writer.compression(CompressionType.BLOCK, + new DefaultCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendRecordCompression() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); + fs.delete(file, true); + + Option compressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(3L, "three"); + writer.append(4L, "four"); + writer.close(); + + verifyAll4Values(file); + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendBlockCompression() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); + fs.delete(file, true); + + Option compressOption = Writer.compression(CompressionType.BLOCK, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(3L, "three"); + writer.append(4L, "four"); + writer.close(); + + verifyAll4Values(file); + + // Verify failure if the compression details are different or not Provided + try { + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true)); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + // Verify failure if the compression details are different + try { + Option wrongCompressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + try { + Option wrongCompressOption = Writer.compression(CompressionType.BLOCK, + new DefaultCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendSort() throws Exception { + Path file = new Path(ROOT_PATH, "testseqappendSort.seq"); + fs.delete(file, true); + + Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort"); + fs.delete(sortedFile, true); + + SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, + new JavaSerializationComparator(), Long.class, String.class, conf); + + Option compressOption = Writer.compression(CompressionType.BLOCK, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(2L, "two"); + writer.append(1L, "one"); + + writer.close(); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(4L, "four"); + writer.append(3L, "three"); + writer.close(); + + // Sort file after append + sorter.sort(file, sortedFile); + verifyAll4Values(sortedFile); + + fs.deleteOnExit(file); + fs.deleteOnExit(sortedFile); + } + + private void verify2Values(Path file) throws IOException { + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(1L, reader.next((Object) null)); + assertEquals("one", reader.getCurrentValue((Object) null)); + assertEquals(2L, reader.next((Object) null)); + assertEquals("two", reader.getCurrentValue((Object) null)); + assertNull(reader.next((Object) null)); + reader.close(); + } + + private void verifyAll4Values(Path file) throws IOException { + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(1L, reader.next((Object) null)); + assertEquals("one", reader.getCurrentValue((Object) null)); + assertEquals(2L, reader.next((Object) null)); + assertEquals("two", reader.getCurrentValue((Object) null)); + assertEquals(3L, reader.next((Object) null)); + assertEquals("three", reader.getCurrentValue((Object) null)); + assertEquals(4L, reader.next((Object) null)); + assertEquals("four", reader.getCurrentValue((Object) null)); + assertNull(reader.next((Object) null)); + reader.close(); + } +}