HADOOP-11730. Regression: s3n read failure recovery broken. (Takenori Sato via stevel)
This commit is contained in:
parent
d1253f695d
commit
a6a5d1d6b5
|
@ -125,6 +125,9 @@ Release 2.7.1 - UNRELEASED
|
||||||
HADOOP-11872. "hadoop dfs" command prints message about using "yarn jar" on
|
HADOOP-11872. "hadoop dfs" command prints message about using "yarn jar" on
|
||||||
Windows(branch-2 only) (Varun Vasudev via cnauroth)
|
Windows(branch-2 only) (Varun Vasudev via cnauroth)
|
||||||
|
|
||||||
|
HADOOP-11730. Regression: s3n read failure recovery broken.
|
||||||
|
(Takenori Sato via stevel)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.s3.S3Exception;
|
import org.apache.hadoop.fs.s3.S3Exception;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
@ -124,7 +125,7 @@ public class NativeS3FileSystem extends FileSystem {
|
||||||
key);
|
key);
|
||||||
LOG.debug("{}", e, e);
|
LOG.debug("{}", e, e);
|
||||||
try {
|
try {
|
||||||
seek(pos);
|
reopen(pos);
|
||||||
result = in.read();
|
result = in.read();
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
LOG.debug("EOF on input stream read: {}", eof, eof);
|
LOG.debug("EOF on input stream read: {}", eof, eof);
|
||||||
|
@ -153,7 +154,7 @@ public class NativeS3FileSystem extends FileSystem {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info( "Received IOException while reading '{}'," +
|
LOG.info( "Received IOException while reading '{}'," +
|
||||||
" attempting to reopen.", key);
|
" attempting to reopen.", key);
|
||||||
seek(pos);
|
reopen(pos);
|
||||||
result = in.read(b, off, len);
|
result = in.read(b, off, len);
|
||||||
}
|
}
|
||||||
if (result > 0) {
|
if (result > 0) {
|
||||||
|
@ -173,16 +174,21 @@ public class NativeS3FileSystem extends FileSystem {
|
||||||
/**
|
/**
|
||||||
* Close the inner stream if not null. Even if an exception
|
* Close the inner stream if not null. Even if an exception
|
||||||
* is raised during the close, the field is set to null
|
* is raised during the close, the field is set to null
|
||||||
* @throws IOException if raised by the close() operation.
|
|
||||||
*/
|
*/
|
||||||
private void closeInnerStream() throws IOException {
|
private void closeInnerStream() {
|
||||||
if (in != null) {
|
IOUtils.closeStream(in);
|
||||||
try {
|
|
||||||
in.close();
|
|
||||||
} finally {
|
|
||||||
in = null;
|
in = null;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* Reopen a new input stream with the specified position
|
||||||
|
* @param pos the position to reopen a new stream
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private synchronized void reopen(long pos) throws IOException {
|
||||||
|
LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
|
||||||
|
InputStream newStream = store.retrieve(key, pos);
|
||||||
|
updateInnerStream(newStream, pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -207,9 +213,7 @@ public class NativeS3FileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
if (pos != newpos) {
|
if (pos != newpos) {
|
||||||
// the seek is attempting to move the current position
|
// the seek is attempting to move the current position
|
||||||
LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
|
reopen(newpos);
|
||||||
InputStream newStream = store.retrieve(key, newpos);
|
|
||||||
updateInnerStream(newStream, newpos);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,14 +165,15 @@ public abstract class NativeS3FileSystemContractBaseTest
|
||||||
|
|
||||||
public void testRetryOnIoException() throws Exception {
|
public void testRetryOnIoException() throws Exception {
|
||||||
class TestInputStream extends InputStream {
|
class TestInputStream extends InputStream {
|
||||||
boolean shouldThrow = false;
|
boolean shouldThrow = true;
|
||||||
int throwCount = 0;
|
int throwCount = 0;
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
byte[] bytes;
|
byte[] bytes;
|
||||||
|
boolean threwException = false;
|
||||||
|
|
||||||
public TestInputStream() {
|
public TestInputStream() {
|
||||||
bytes = new byte[256];
|
bytes = new byte[256];
|
||||||
for (int i = 0; i < 256; i++) {
|
for (int i = pos; i < 256; i++) {
|
||||||
bytes[i] = (byte)i;
|
bytes[i] = (byte)i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,8 +183,10 @@ public abstract class NativeS3FileSystemContractBaseTest
|
||||||
shouldThrow = !shouldThrow;
|
shouldThrow = !shouldThrow;
|
||||||
if (shouldThrow) {
|
if (shouldThrow) {
|
||||||
throwCount++;
|
throwCount++;
|
||||||
|
threwException = true;
|
||||||
throw new IOException();
|
throw new IOException();
|
||||||
}
|
}
|
||||||
|
assertFalse("IOException was thrown. InputStream should be reopened", threwException);
|
||||||
return pos++;
|
return pos++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,9 +195,10 @@ public abstract class NativeS3FileSystemContractBaseTest
|
||||||
shouldThrow = !shouldThrow;
|
shouldThrow = !shouldThrow;
|
||||||
if (shouldThrow) {
|
if (shouldThrow) {
|
||||||
throwCount++;
|
throwCount++;
|
||||||
|
threwException = true;
|
||||||
throw new IOException();
|
throw new IOException();
|
||||||
}
|
}
|
||||||
|
assertFalse("IOException was thrown. InputStream should be reopened", threwException);
|
||||||
int sizeToRead = Math.min(len, 256 - pos);
|
int sizeToRead = Math.min(len, 256 - pos);
|
||||||
for (int i = 0; i < sizeToRead; i++) {
|
for (int i = 0; i < sizeToRead; i++) {
|
||||||
b[i] = bytes[pos + i];
|
b[i] = bytes[pos + i];
|
||||||
|
@ -202,13 +206,20 @@ public abstract class NativeS3FileSystemContractBaseTest
|
||||||
pos += sizeToRead;
|
pos += sizeToRead;
|
||||||
return sizeToRead;
|
return sizeToRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reopenAt(long byteRangeStart) {
|
||||||
|
threwException = false;
|
||||||
|
pos = Long.valueOf(byteRangeStart).intValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
final InputStream is = new TestInputStream();
|
}
|
||||||
|
|
||||||
|
final TestInputStream is = new TestInputStream();
|
||||||
|
|
||||||
class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
|
class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
|
||||||
@Override
|
@Override
|
||||||
public InputStream retrieve(String key, long byteRangeStart) throws IOException {
|
public InputStream retrieve(String key, long byteRangeStart) throws IOException {
|
||||||
|
is.reopenAt(byteRangeStart);
|
||||||
return is;
|
return is;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,8 +244,9 @@ public abstract class NativeS3FileSystemContractBaseTest
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test to make sure the throw path was exercised.
|
// Test to make sure the throw path was exercised.
|
||||||
// 144 = 128 + (128 / 8)
|
// every read should have thrown 1 IOException except for the first read
|
||||||
assertEquals(144, ((TestInputStream)is).throwCount);
|
// 144 = 128 - 1 + (128 / 8)
|
||||||
|
assertEquals(143, ((TestInputStream)is).throwCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue