HADOOP-11730. Regression: s3n read failure recovery broken. (Takenori Sato via stevel)

This commit is contained in:
Steve Loughran 2015-04-23 21:39:30 +01:00
parent 37673268ef
commit ecab7d2265
3 changed files with 39 additions and 20 deletions

View File

@ -18,6 +18,9 @@ Release 2.7.1 - UNRELEASED
HADOOP-11872. "hadoop dfs" command prints message about using "yarn jar" on HADOOP-11872. "hadoop dfs" command prints message about using "yarn jar" on
Windows(branch-2 only) (Varun Vasudev via cnauroth) Windows(branch-2 only) (Varun Vasudev via cnauroth)
HADOOP-11730. Regression: s3n read failure recovery broken.
(Takenori Sato via stevel)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3.S3Exception; import org.apache.hadoop.fs.s3.S3Exception;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.io.retry.RetryProxy;
@ -124,7 +125,7 @@ public class NativeS3FileSystem extends FileSystem {
key); key);
LOG.debug("{}", e, e); LOG.debug("{}", e, e);
try { try {
seek(pos); reopen(pos);
result = in.read(); result = in.read();
} catch (EOFException eof) { } catch (EOFException eof) {
LOG.debug("EOF on input stream read: {}", eof, eof); LOG.debug("EOF on input stream read: {}", eof, eof);
@ -153,7 +154,7 @@ public class NativeS3FileSystem extends FileSystem {
} catch (IOException e) { } catch (IOException e) {
LOG.info( "Received IOException while reading '{}'," + LOG.info( "Received IOException while reading '{}'," +
" attempting to reopen.", key); " attempting to reopen.", key);
seek(pos); reopen(pos);
result = in.read(b, off, len); result = in.read(b, off, len);
} }
if (result > 0) { if (result > 0) {
@ -173,16 +174,21 @@ public class NativeS3FileSystem extends FileSystem {
/** /**
* Close the inner stream if not null. Even if an exception * Close the inner stream if not null. Even if an exception
* is raised during the close, the field is set to null * is raised during the close, the field is set to null
* @throws IOException if raised by the close() operation.
*/ */
private void closeInnerStream() throws IOException { private void closeInnerStream() {
if (in != null) { IOUtils.closeStream(in);
try { in = null;
in.close(); }
} finally {
in = null; /**
} * Reopen a new input stream with the specified position
} * @param pos the position to reopen a new stream
* @throws IOException
*/
private synchronized void reopen(long pos) throws IOException {
LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
InputStream newStream = store.retrieve(key, pos);
updateInnerStream(newStream, pos);
} }
/** /**
@ -207,9 +213,7 @@ public class NativeS3FileSystem extends FileSystem {
} }
if (pos != newpos) { if (pos != newpos) {
// the seek is attempting to move the current position // the seek is attempting to move the current position
LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); reopen(newpos);
InputStream newStream = store.retrieve(key, newpos);
updateInnerStream(newStream, newpos);
} }
} }

View File

@ -165,14 +165,15 @@ public abstract class NativeS3FileSystemContractBaseTest
public void testRetryOnIoException() throws Exception { public void testRetryOnIoException() throws Exception {
class TestInputStream extends InputStream { class TestInputStream extends InputStream {
boolean shouldThrow = false; boolean shouldThrow = true;
int throwCount = 0; int throwCount = 0;
int pos = 0; int pos = 0;
byte[] bytes; byte[] bytes;
boolean threwException = false;
public TestInputStream() { public TestInputStream() {
bytes = new byte[256]; bytes = new byte[256];
for (int i = 0; i < 256; i++) { for (int i = pos; i < 256; i++) {
bytes[i] = (byte)i; bytes[i] = (byte)i;
} }
} }
@ -182,8 +183,10 @@ public abstract class NativeS3FileSystemContractBaseTest
shouldThrow = !shouldThrow; shouldThrow = !shouldThrow;
if (shouldThrow) { if (shouldThrow) {
throwCount++; throwCount++;
threwException = true;
throw new IOException(); throw new IOException();
} }
assertFalse("IOException was thrown. InputStream should be reopened", threwException);
return pos++; return pos++;
} }
@ -192,9 +195,10 @@ public abstract class NativeS3FileSystemContractBaseTest
shouldThrow = !shouldThrow; shouldThrow = !shouldThrow;
if (shouldThrow) { if (shouldThrow) {
throwCount++; throwCount++;
threwException = true;
throw new IOException(); throw new IOException();
} }
assertFalse("IOException was thrown. InputStream should be reopened", threwException);
int sizeToRead = Math.min(len, 256 - pos); int sizeToRead = Math.min(len, 256 - pos);
for (int i = 0; i < sizeToRead; i++) { for (int i = 0; i < sizeToRead; i++) {
b[i] = bytes[pos + i]; b[i] = bytes[pos + i];
@ -202,13 +206,20 @@ public abstract class NativeS3FileSystemContractBaseTest
pos += sizeToRead; pos += sizeToRead;
return sizeToRead; return sizeToRead;
} }
public void reopenAt(long byteRangeStart) {
threwException = false;
pos = Long.valueOf(byteRangeStart).intValue();
}
} }
final InputStream is = new TestInputStream(); final TestInputStream is = new TestInputStream();
class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore { class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
@Override @Override
public InputStream retrieve(String key, long byteRangeStart) throws IOException { public InputStream retrieve(String key, long byteRangeStart) throws IOException {
is.reopenAt(byteRangeStart);
return is; return is;
} }
} }
@ -233,8 +244,9 @@ public abstract class NativeS3FileSystemContractBaseTest
} }
// Test to make sure the throw path was exercised. // Test to make sure the throw path was exercised.
// 144 = 128 + (128 / 8) // every read should have thrown 1 IOException except for the first read
assertEquals(144, ((TestInputStream)is).throwCount); // 144 = 128 - 1 + (128 / 8)
assertEquals(143, ((TestInputStream)is).throwCount);
} }
} }