HADOOP-6254. Slow reads cause s3n to fail with SocketTimeoutException. Contributed by Andrew Hitchcock.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@888697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
929e91a08c
commit
368b6cc85b
|
@ -62,6 +62,9 @@ Trunk (unreleased changes)
|
|||
HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress:
|
||||
IllegalArgumentException is thrown (cos)
|
||||
|
||||
HADOOP-6254. Slow reads cause s3n to fail with SocketTimeoutException.
|
||||
(Andrew Hitchcock via tomwhite)
|
||||
|
||||
Release 0.21.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -86,20 +86,31 @@ public class NativeS3FileSystem extends FileSystem {
|
|||
static final String PATH_DELIMITER = Path.SEPARATOR;
|
||||
private static final int S3_MAX_LISTING_LENGTH = 1000;
|
||||
|
||||
private class NativeS3FsInputStream extends FSInputStream {
|
||||
static class NativeS3FsInputStream extends FSInputStream {
|
||||
|
||||
private NativeFileSystemStore store;
|
||||
private Statistics statistics;
|
||||
private InputStream in;
|
||||
private final String key;
|
||||
private long pos = 0;
|
||||
|
||||
public NativeS3FsInputStream(InputStream in, String key) {
|
||||
public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
|
||||
this.store = store;
|
||||
this.statistics = statistics;
|
||||
this.in = in;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read() throws IOException {
|
||||
int result = in.read();
|
||||
int result = -1;
|
||||
try {
|
||||
result = in.read();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
|
||||
seek(pos);
|
||||
result = in.read();
|
||||
}
|
||||
if (result != -1) {
|
||||
pos++;
|
||||
}
|
||||
|
@ -112,7 +123,14 @@ public class NativeS3FileSystem extends FileSystem {
|
|||
public synchronized int read(byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
|
||||
int result = in.read(b, off, len);
|
||||
int result = -1;
|
||||
try {
|
||||
result = in.read(b, off, len);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
|
||||
seek(pos);
|
||||
result = in.read(b, off, len);
|
||||
}
|
||||
if (result > 0) {
|
||||
pos += result;
|
||||
}
|
||||
|
@ -514,7 +532,7 @@ public class NativeS3FileSystem extends FileSystem {
|
|||
Path absolutePath = makeAbsolute(f);
|
||||
String key = pathToKey(absolutePath);
|
||||
return new FSDataInputStream(new BufferedFSInputStream(
|
||||
new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
|
||||
new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
|
||||
}
|
||||
|
||||
// rename() and delete() use this method to ensure that the parent directory
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
package org.apache.hadoop.fs.s3native;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream;
|
||||
|
||||
public abstract class NativeS3FileSystemContractBaseTest
|
||||
extends FileSystemContractBaseTest {
|
||||
|
@ -148,5 +150,79 @@ public abstract class NativeS3FileSystemContractBaseTest
|
|||
assertEquals("Double default block size", newBlockSize,
|
||||
fs.getFileStatus(file).getBlockSize());
|
||||
}
|
||||
|
||||
public void testRetryOnIoException() throws Exception {
|
||||
class TestInputStream extends InputStream {
|
||||
boolean shouldThrow = false;
|
||||
int throwCount = 0;
|
||||
int pos = 0;
|
||||
byte[] bytes;
|
||||
|
||||
public TestInputStream() {
|
||||
bytes = new byte[256];
|
||||
for (int i = 0; i < 256; i++) {
|
||||
bytes[i] = (byte)i;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
shouldThrow = !shouldThrow;
|
||||
if (shouldThrow) {
|
||||
throwCount++;
|
||||
throw new IOException();
|
||||
}
|
||||
return pos++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
shouldThrow = !shouldThrow;
|
||||
if (shouldThrow) {
|
||||
throwCount++;
|
||||
throw new IOException();
|
||||
}
|
||||
|
||||
int sizeToRead = Math.min(len, 256 - pos);
|
||||
for (int i = 0; i < sizeToRead; i++) {
|
||||
b[i] = bytes[pos + i];
|
||||
}
|
||||
pos += sizeToRead;
|
||||
return sizeToRead;
|
||||
}
|
||||
}
|
||||
|
||||
final InputStream is = new TestInputStream();
|
||||
|
||||
class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
|
||||
@Override
|
||||
public InputStream retrieve(String key, long byteRangeStart) throws IOException {
|
||||
return is;
|
||||
}
|
||||
}
|
||||
|
||||
NativeS3FsInputStream stream = new NativeS3FsInputStream(new MockNativeFileSystemStore(), null, is, "");
|
||||
|
||||
// Test reading methods.
|
||||
byte[] result = new byte[256];
|
||||
for (int i = 0; i < 128; i++) {
|
||||
result[i] = (byte)stream.read();
|
||||
}
|
||||
for (int i = 128; i < 256; i += 8) {
|
||||
byte[] temp = new byte[8];
|
||||
int read = stream.read(temp, 0, 8);
|
||||
assertEquals(8, read);
|
||||
System.arraycopy(temp, 0, result, i, 8);
|
||||
}
|
||||
|
||||
// Assert correct
|
||||
for (int i = 0; i < 256; i++) {
|
||||
assertEquals((byte)i, result[i]);
|
||||
}
|
||||
|
||||
// Test to make sure the throw path was exercised.
|
||||
// 144 = 128 + (128 / 8)
|
||||
assertEquals(144, ((TestInputStream)is).throwCount);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue