HADOOP-9307. BufferedFSInputStream.read returns wrong results after certain seeks. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1482378 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b30d21bd13
commit
0e7dec2b39
|
@ -159,6 +159,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
Service HEALTHY, and results in null data at ActiveBreadCrumb.
|
Service HEALTHY, and results in null data at ActiveBreadCrumb.
|
||||||
(Vinay and todd via todd)
|
(Vinay and todd via todd)
|
||||||
|
|
||||||
|
HADOOP-9307. BufferedFSInputStream.read returns wrong results
|
||||||
|
after certain seeks. (todd)
|
||||||
|
|
||||||
Release 2.0.4-alpha - 2013-04-25
|
Release 2.0.4-alpha - 2013-04-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -69,12 +69,17 @@ implements Seekable, PositionedReadable, HasFileDescriptor {
|
||||||
if( pos<0 ) {
|
if( pos<0 ) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// optimize: check if the pos is in the buffer
|
if (this.pos != this.count) {
|
||||||
long end = ((FSInputStream)in).getPos();
|
// optimize: check if the pos is in the buffer
|
||||||
long start = end - count;
|
// This optimization only works if pos != count -- if they are
|
||||||
if( pos>=start && pos<end) {
|
// equal, it's possible that the previous reads were just
|
||||||
this.pos = (int)(pos-start);
|
// longer than the total buffer size, and hence skipped the buffer.
|
||||||
return;
|
long end = ((FSInputStream)in).getPos();
|
||||||
|
long start = end - count;
|
||||||
|
if( pos>=start && pos<end) {
|
||||||
|
this.pos = (int)(pos-start);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// invalidate buffer
|
// invalidate buffer
|
||||||
|
|
|
@ -19,10 +19,13 @@ package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
|
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@ -38,6 +41,7 @@ public class TestLocalFileSystem {
|
||||||
= System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs";
|
= System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs";
|
||||||
|
|
||||||
private final File base = new File(TEST_ROOT_DIR);
|
private final File base = new File(TEST_ROOT_DIR);
|
||||||
|
private final Path TEST_PATH = new Path(TEST_ROOT_DIR, "test-file");
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private LocalFileSystem fileSys;
|
private LocalFileSystem fileSys;
|
||||||
|
|
||||||
|
@ -357,6 +361,73 @@ public class TestLocalFileSystem {
|
||||||
status = fileSys.getFileStatus(path);
|
status = fileSys.getFileStatus(path);
|
||||||
assertEquals(newModTime, status.getModificationTime());
|
assertEquals(newModTime, status.getModificationTime());
|
||||||
assertEquals(0, status.getAccessTime());
|
assertEquals(0, status.getAccessTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for HADOOP-9307: BufferedFSInputStream returning
|
||||||
|
* wrong results after certain sequences of seeks and reads.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBufferedFSInputStream() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
|
||||||
|
conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096);
|
||||||
|
FileSystem fs = FileSystem.newInstance(conf);
|
||||||
|
|
||||||
|
byte[] buf = new byte[10*1024];
|
||||||
|
new Random().nextBytes(buf);
|
||||||
|
|
||||||
|
// Write random bytes to file
|
||||||
|
FSDataOutputStream stream = fs.create(TEST_PATH);
|
||||||
|
try {
|
||||||
|
stream.write(buf);
|
||||||
|
} finally {
|
||||||
|
stream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
Random r = new Random();
|
||||||
|
|
||||||
|
FSDataInputStream stm = fs.open(TEST_PATH);
|
||||||
|
// Record the sequence of seeks and reads which trigger a failure.
|
||||||
|
int seeks[] = new int[10];
|
||||||
|
int reads[] = new int[10];
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
int seekOff = r.nextInt(buf.length);
|
||||||
|
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
|
||||||
|
|
||||||
|
seeks[i % seeks.length] = seekOff;
|
||||||
|
reads[i % reads.length] = toRead;
|
||||||
|
verifyRead(stm, buf, seekOff, toRead);
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (AssertionError afe) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("Sequence of actions:\n");
|
||||||
|
for (int j = 0; j < seeks.length; j++) {
|
||||||
|
sb.append("seek @ ").append(seeks[j]).append(" ")
|
||||||
|
.append("read ").append(reads[j]).append("\n");
|
||||||
|
}
|
||||||
|
System.err.println(sb.toString());
|
||||||
|
throw afe;
|
||||||
|
} finally {
|
||||||
|
stm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyRead(FSDataInputStream stm, byte[] fileContents,
|
||||||
|
int seekOff, int toRead) throws IOException {
|
||||||
|
byte[] out = new byte[toRead];
|
||||||
|
stm.seek(seekOff);
|
||||||
|
stm.readFully(out);
|
||||||
|
byte[] expected = Arrays.copyOfRange(fileContents, seekOff, seekOff+toRead);
|
||||||
|
if (!Arrays.equals(out, expected)) {
|
||||||
|
String s ="\nExpected: " +
|
||||||
|
StringUtils.byteToHexString(expected) +
|
||||||
|
"\ngot: " +
|
||||||
|
StringUtils.byteToHexString(out) +
|
||||||
|
"\noff=" + seekOff + " len=" + toRead;
|
||||||
|
fail(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue