HADOOP-17812. NPE in S3AInputStream read() after failure to reconnect to store (#3222)

This improves error handling after multiple failures reading data
-when the read fails and attempts to reconnect() also fail.

Contributed by Bobby Wang.

Change-Id: If17dee395ad6b9b7c738021bad20d0a13eb4011e
This commit is contained in:
Bobby Wang 2021-07-31 03:04:11 +08:00 committed by Steve Loughran
parent f2cec5cb88
commit 904cdd0b00
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 38 additions and 10 deletions

View File

@ -418,15 +418,21 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
int byteRead = invoker.retry("read", pathStr, true,
() -> {
int b;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
reopen("failure recovery", getPos(), 1, false);
}
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, 1, true);
onReadFailure(e, true);
throw e;
} catch (IOException e) {
onReadFailure(e, 1, false);
onReadFailure(e, false);
throw e;
}
return b;
@ -444,15 +450,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
}
/**
* Handle an IOE on a read by attempting to re-open the stream.
* Close the stream on read failure.
* The filesystem's readException count will be incremented.
* @param ioe exception caught.
* @param length length of data being attempted to read
* @throws IOException any exception thrown on the re-open attempt.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException {
private void onReadFailure(IOException ioe, boolean forceAbort) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: ",
@ -463,7 +466,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
uri, client, object);
}
streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort);
closeStream("failure recovery", contentRangeFinish, forceAbort);
}
/**
@ -506,16 +509,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
int bytesRead = invoker.retry("read", pathStr, true,
() -> {
int bytes;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
reopen("failure recovery", getPos(), 1, false);
}
try {
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, len, true);
onReadFailure(e, true);
throw e;
} catch (IOException e) {
onReadFailure(e, len, false);
onReadFailure(e, false);
throw e;
}
return bytes;

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.SocketException;
import java.nio.charset.Charset;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
@ -120,10 +121,28 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
return new S3AInputStream.InputStreamCallbacks() {
private final S3Object mockedS3Object = getMockedS3Object();
private Integer mockedS3ObjectIndex = 0;
@Override
public S3Object getObject(GetObjectRequest request) {
// Set s3 client to return mocked s3object with defined read behavior.
mockedS3ObjectIndex++;
// open() -> lazySeek() -> reopen()
// -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
// read() -> objectInputStreamBad1 throws exception
// -> onReadFailure -> close wrappedStream
// -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
// -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
// -> wrappedStream.read -> objectInputStreamBad2 throws exception
// -> onReadFailure -> close wrappedStream
// -> retry(2) -> wrappedStream==null -> reopen
// -> getObject (mockedS3ObjectIndex=3) throws exception
// -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
// -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
// -> wrappedStream.read
if (mockedS3ObjectIndex == 3) {
throw new SdkClientException("Failed to get S3Object");
}
return mockedS3Object;
}