diff --git a/CHANGES.txt b/CHANGES.txt index d5d692f3ea8..f776ad27b0f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -62,6 +62,9 @@ Trunk (unreleased changes) HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress: IllegalArgumentException is thrown (cos) + HADOOP-6254. Slow reads cause s3n to fail with SocketTimeoutException. + (Andrew Hitchcock via tomwhite) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java index ea5a23b3b25..8898c889ae8 100644 --- a/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java +++ b/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @@ -86,20 +86,31 @@ public class NativeS3FileSystem extends FileSystem { static final String PATH_DELIMITER = Path.SEPARATOR; private static final int S3_MAX_LISTING_LENGTH = 1000; - private class NativeS3FsInputStream extends FSInputStream { + static class NativeS3FsInputStream extends FSInputStream { + private NativeFileSystemStore store; + private Statistics statistics; private InputStream in; private final String key; private long pos = 0; - public NativeS3FsInputStream(InputStream in, String key) { + public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { + this.store = store; + this.statistics = statistics; this.in = in; this.key = key; } @Override public synchronized int read() throws IOException { - int result = in.read(); + int result = -1; + try { + result = in.read(); + } catch (IOException e) { + LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); + seek(pos); + result = in.read(); + } if (result != -1) { pos++; } @@ -112,7 +123,14 @@ public class NativeS3FileSystem extends FileSystem { public synchronized int read(byte[] b, int off, int len) throws IOException { - int result = in.read(b, off, len); + int result = -1; + try { + result = in.read(b, off, len); + } catch (IOException e) { + LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); + seek(pos); + result = in.read(b, off, len); + } if (result > 0) { pos += result; } @@ -514,7 +532,7 @@ public class NativeS3FileSystem extends FileSystem { Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); return new FSDataInputStream(new BufferedFSInputStream( - new NativeS3FsInputStream(store.retrieve(key), key), bufferSize)); + new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); } // rename() and delete() use this method to ensure that the parent directory diff --git a/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java b/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java index 337eeaba5bb..8396c9e73dc 100644 --- a/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java +++ b/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java @@ -19,12 +19,14 @@ package org.apache.hadoop.fs.s3native; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystemContractBaseTest; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream; public abstract class NativeS3FileSystemContractBaseTest extends FileSystemContractBaseTest { @@ -148,5 +150,79 @@ public abstract class NativeS3FileSystemContractBaseTest assertEquals("Double default block size", newBlockSize, fs.getFileStatus(file).getBlockSize()); } + + public void testRetryOnIoException() throws Exception { + class TestInputStream extends InputStream { + boolean shouldThrow = false; + int throwCount = 0; + int pos = 0; + byte[] bytes; + + public TestInputStream() { + bytes = new byte[256]; + for (int i = 0; i < 256; i++) { + bytes[i] = (byte)i; + } + } + + @Override + public int read() throws IOException { + shouldThrow = !shouldThrow; + if (shouldThrow) { + throwCount++; + throw new IOException(); + } + return pos++; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + shouldThrow = !shouldThrow; + if (shouldThrow) { + throwCount++; + throw new IOException(); + } + + int sizeToRead = Math.min(len, 256 - pos); + for (int i = 0; i < sizeToRead; i++) { + b[i] = bytes[pos + i]; + } + pos += sizeToRead; + return sizeToRead; + } + } + + final InputStream is = new TestInputStream(); + + class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore { + @Override + public InputStream retrieve(String key, long byteRangeStart) throws IOException { + return is; + } + } + + NativeS3FsInputStream stream = new NativeS3FsInputStream(new MockNativeFileSystemStore(), null, is, ""); + + // Test reading methods. + byte[] result = new byte[256]; + for (int i = 0; i < 128; i++) { + result[i] = (byte)stream.read(); + } + for (int i = 128; i < 256; i += 8) { + byte[] temp = new byte[8]; + int read = stream.read(temp, 0, 8); + assertEquals(8, read); + System.arraycopy(temp, 0, result, i, 8); + } + + // Assert correct + for (int i = 0; i < 256; i++) { + assertEquals((byte)i, result[i]); + } + + // Test to make sure the throw path was exercised. + // 144 = 128 + (128 / 8) + assertEquals(144, ((TestInputStream)is).throwCount); + } }