diff --git a/CHANGES.txt b/CHANGES.txt index 8eabd3e1b09..22541c2da60 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1008,6 +1008,10 @@ Trunk (unreleased changes) HADOOP-6181. Fix .eclipse.templates/.classpath for avro and jets3t jar files. (Carlos Valiente via szetszwo) + HADOOP-6196. Fix a bug in SequenceFile.Reader where syncing within the + header would cause the reader to read the sync marker as a record. (Jay + Booth via cdouglas) + Release 0.20.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index ae494f948ed..db77fa0ecfe 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -1397,6 +1397,7 @@ public class SequenceFile { private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; + private long headerEnd; private long end; private int keyLength; private int recordLength; @@ -1546,6 +1547,7 @@ public class SequenceFile { if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes + headerEnd = in.getPos(); // record end of header } // Initialize... *not* if this we are constructing a temporary Reader @@ -2210,6 +2212,14 @@ public class SequenceFile { return; } + if (position < headerEnd) { + // seek directly to first record + in.seek(headerEnd); + // note the sync marker "seen" in the header + syncSeen = true; + return; + } + try { seek(position+4); // skip escape in.readFully(syncCheck); diff --git a/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java new file mode 100644 index 00000000000..933ee29cfea --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static org.junit.Assert.*; + +public class TestSequenceFileSync { + private static final int NUMRECORDS = 2000; + private static final int RECORDSIZE = 80; + private static final Random rand = new Random(); + + private final static String REC_FMT = "%d RECORDID %d : "; + + + private static void forOffset(SequenceFile.Reader reader, + IntWritable key, Text val, int iter, long off, int expectedRecord) + throws IOException { + val.clear(); + reader.sync(off); + reader.next(key, val); + assertEquals(key.get(), expectedRecord); + final String test = String.format(REC_FMT, expectedRecord, expectedRecord); + assertEquals("Invalid value " + val, 0, val.find(test, 0)); + } + + @Test + public void testLowSyncpoint() throws IOException { + final Configuration conf = new Configuration(); + final FileSystem fs = FileSystem.getLocal(conf); + final Path path = new Path(System.getProperty("test.build.data", "/tmp"), + "sequencefile.sync.test"); + final IntWritable input = new IntWritable(); + final Text val = new Text(); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, + IntWritable.class, Text.class); + try { + writeSequenceFile(writer, NUMRECORDS); + for (int i = 0; i < 5 ; i++) { + final SequenceFile.Reader reader = + new SequenceFile.Reader(fs, path, conf); + try { + forOffset(reader, input, val, i, 0, 0); + forOffset(reader, input, val, i, 65, 0); + forOffset(reader, input, val, i, 2000, 21); + forOffset(reader, input, val, i, 0, 0); + } finally { + reader.close(); + } + } + } finally { + fs.delete(path, false); + } + } + + public static void writeSequenceFile(SequenceFile.Writer writer, + int numRecords) throws IOException { + final IntWritable key = new IntWritable(); + final Text val = new Text(); + for (int numWritten = 0; numWritten < numRecords; ++numWritten) { + key.set(numWritten); + randomText(val, numWritten, RECORDSIZE); + writer.append(key, val); + } + writer.close(); + } + + static void randomText(Text val, int id, int recordSize) { + val.clear(); + final StringBuilder ret = new StringBuilder(recordSize); + ret.append(String.format(REC_FMT, id, id)); + recordSize -= ret.length(); + for (int i = 0; i < recordSize; ++i) { + ret.append(rand.nextInt(9)); + } + val.set(ret.toString()); + } +}