diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6564d245513..fb6153f32f5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -54,6 +54,9 @@ Release 2.6.1 - UNRELEASED HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up split calculation (gera) + HADOOP-11730. Regression: s3n read failure recovery broken. + (Takenori Sato via stevel) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java index e490edf8a22..663db2376b8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -119,7 +120,7 @@ public class NativeS3FileSystem extends FileSystem { key); LOG.debug("{}", e, e); try { - seek(pos); + reopen(pos); result = in.read(); } catch (EOFException eof) { LOG.debug("EOF on input stream read: {}", eof, eof); @@ -148,7 +149,7 @@ public class NativeS3FileSystem extends FileSystem { } catch (IOException e) { LOG.info( "Received IOException while reading '{}'," + " attempting to reopen.", key); - seek(pos); + reopen(pos); result = in.read(b, off, len); } if (result > 0) { @@ -168,16 +169,21 @@ public class NativeS3FileSystem extends FileSystem { /** * Close the inner stream if not null. Even if an exception * is raised during the close, the field is set to null - * @throws IOException if raised by the close() operation. */ - private void closeInnerStream() throws IOException { - if (in != null) { - try { - in.close(); - } finally { - in = null; - } - } + private void closeInnerStream() { + IOUtils.closeStream(in); + in = null; + } + + /** + * Reopen a new input stream with the specified position + * @param pos the position to reopen a new stream + * @throws IOException + */ + private synchronized void reopen(long pos) throws IOException { + LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); + InputStream newStream = store.retrieve(key, pos); + updateInnerStream(newStream, pos); } /** @@ -202,9 +208,7 @@ public class NativeS3FileSystem extends FileSystem { } if (pos != newpos) { // the seek is attempting to move the current position - LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); - InputStream newStream = store.retrieve(key, newpos); - updateInnerStream(newStream, newpos); + reopen(newpos); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java index ac6b9ec3251..c6a6bc2a157 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java @@ -158,14 +158,15 @@ public abstract class NativeS3FileSystemContractBaseTest public void testRetryOnIoException() throws Exception { class TestInputStream extends InputStream { - boolean shouldThrow = false; + boolean shouldThrow = true; int throwCount = 0; int pos = 0; byte[] bytes; + boolean threwException = false; public TestInputStream() { bytes = new byte[256]; - for (int i = 0; i < 256; i++) { + for (int i = pos; i < 256; i++) { bytes[i] = (byte)i; } } @@ -175,8 +176,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } + assertFalse("IOException was thrown. InputStream should be reopened", threwException); return pos++; } @@ -185,9 +188,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } - + assertFalse("IOException was thrown. InputStream should be reopened", threwException); int sizeToRead = Math.min(len, 256 - pos); for (int i = 0; i < sizeToRead; i++) { b[i] = bytes[pos + i]; @@ -195,13 +199,20 @@ public abstract class NativeS3FileSystemContractBaseTest pos += sizeToRead; return sizeToRead; } + + public void reopenAt(long byteRangeStart) { + threwException = false; + pos = Long.valueOf(byteRangeStart).intValue(); + } + } - final InputStream is = new TestInputStream(); + final TestInputStream is = new TestInputStream(); class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore { @Override public InputStream retrieve(String key, long byteRangeStart) throws IOException { + is.reopenAt(byteRangeStart); return is; } } @@ -226,8 +237,9 @@ public abstract class NativeS3FileSystemContractBaseTest } // Test to make sure the throw path was exercised. - // 144 = 128 + (128 / 8) - assertEquals(144, ((TestInputStream)is).throwCount); + // every read should have thrown 1 IOException except for the first read + // 144 = 128 - 1 + (128 / 8) + assertEquals(143, ((TestInputStream)is).throwCount); } }