diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9f1ab9973ae..e29cb0f6caa 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -18,6 +18,9 @@ Release 2.7.1 - UNRELEASED HADOOP-11872. "hadoop dfs" command prints message about using "yarn jar" on Windows(branch-2 only) (Varun Vasudev via cnauroth) + HADOOP-11730. Regression: s3n read failure recovery broken. + (Takenori Sato via stevel) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java index c34d53e4d08..0dc3ba17c09 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -124,7 +125,7 @@ public class NativeS3FileSystem extends FileSystem { key); LOG.debug("{}", e, e); try { - seek(pos); + reopen(pos); result = in.read(); } catch (EOFException eof) { LOG.debug("EOF on input stream read: {}", eof, eof); @@ -153,7 +154,7 @@ public class NativeS3FileSystem extends FileSystem { } catch (IOException e) { LOG.info( "Received IOException while reading '{}'," + " attempting to reopen.", key); - seek(pos); + reopen(pos); result = in.read(b, off, len); } if (result > 0) { @@ -173,16 +174,21 @@ public class NativeS3FileSystem extends FileSystem { /** * Close the inner stream if not null. Even if an exception * is raised during the close, the field is set to null - * @throws IOException if raised by the close() operation. */ - private void closeInnerStream() throws IOException { - if (in != null) { - try { - in.close(); - } finally { - in = null; - } - } + private void closeInnerStream() { + IOUtils.closeStream(in); + in = null; + } + + /** + * Reopen a new input stream with the specified position + * @param pos the position to reopen a new stream + * @throws IOException + */ + private synchronized void reopen(long pos) throws IOException { + LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); + InputStream newStream = store.retrieve(key, pos); + updateInnerStream(newStream, pos); } /** @@ -207,9 +213,7 @@ public class NativeS3FileSystem extends FileSystem { } if (pos != newpos) { // the seek is attempting to move the current position - LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); - InputStream newStream = store.retrieve(key, newpos); - updateInnerStream(newStream, newpos); + reopen(newpos); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java index f215219aee9..3b505157d2d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java @@ -165,14 +165,15 @@ public abstract class NativeS3FileSystemContractBaseTest public void testRetryOnIoException() throws Exception { class TestInputStream extends InputStream { - boolean shouldThrow = false; + boolean shouldThrow = true; int throwCount = 0; int pos = 0; byte[] bytes; + boolean threwException = false; public TestInputStream() { bytes = new byte[256]; - for (int i = 0; i < 256; i++) { + for (int i = pos; i < 256; i++) { bytes[i] = (byte)i; } } @@ -182,8 +183,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } + assertFalse("IOException was thrown. InputStream should be reopened", threwException); return pos++; } @@ -192,9 +195,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } - + assertFalse("IOException was thrown. InputStream should be reopened", threwException); int sizeToRead = Math.min(len, 256 - pos); for (int i = 0; i < sizeToRead; i++) { b[i] = bytes[pos + i]; @@ -202,13 +206,20 @@ public abstract class NativeS3FileSystemContractBaseTest pos += sizeToRead; return sizeToRead; } + + public void reopenAt(long byteRangeStart) { + threwException = false; + pos = Long.valueOf(byteRangeStart).intValue(); + } + } - final InputStream is = new TestInputStream(); + final TestInputStream is = new TestInputStream(); class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore { @Override public InputStream retrieve(String key, long byteRangeStart) throws IOException { + is.reopenAt(byteRangeStart); return is; } } @@ -233,8 +244,9 @@ public abstract class NativeS3FileSystemContractBaseTest } // Test to make sure the throw path was exercised. - // 144 = 128 + (128 / 8) - assertEquals(144, ((TestInputStream)is).throwCount); + // every read should have thrown 1 IOException except for the first read + // 144 = 128 - 1 + (128 / 8) + assertEquals(143, ((TestInputStream)is).throwCount); } }