HADOOP-17338 Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2692))

Backported from commit ID 3d2193cd64

Contributed by Yongjun Zhang
This commit is contained in:
yzhangal 2021-02-09 04:04:14 -08:00 committed by GitHub
parent 7b4034cd88
commit bac234b0a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 14 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SSECustomerKey;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -72,6 +73,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* set * set
*/ */
private volatile boolean closed; private volatile boolean closed;
/**
* wrappedStream is associated with an object (instance of S3Object). When
* the object is garbage collected, the associated wrappedStream will be
* closed. Keep a reference to this object to prevent the wrapperStream
* still in use from being closed unexpectedly due to garbage collection.
* See HADOOP-17338 for details.
*/
private S3Object object;
private S3ObjectInputStream wrappedStream; private S3ObjectInputStream wrappedStream;
private final FileSystem.Statistics stats; private final FileSystem.Statistics stats;
private final AmazonS3 client; private final AmazonS3 client;
@ -157,7 +166,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
StringUtils.isNotBlank(serverSideEncryptionKey)){ StringUtils.isNotBlank(serverSideEncryptionKey)){
request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
} }
wrappedStream = client.getObject(request).getObjectContent(); object = client.getObject(request);
wrappedStream = object.getObjectContent();
contentRangeStart = targetPos; contentRangeStart = targetPos;
if (wrappedStream == null) { if (wrappedStream == null) {
throw new IOException("Null IO stream from reopen of (" + reason + ") " throw new IOException("Null IO stream from reopen of (" + reason + ") "
@ -345,9 +355,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @throws IOException any exception thrown on the re-open attempt. * @throws IOException any exception thrown on the re-open attempt.
*/ */
private void onReadFailure(IOException ioe, int length) throws IOException { private void onReadFailure(IOException ioe, int length) throws IOException {
LOG.info("Got exception while trying to read from stream {}" if (LOG.isDebugEnabled()) {
+ " trying to recover: "+ ioe, uri); LOG.debug("Got exception while trying to read from stream {}, " +
LOG.debug("While trying to read from stream {}", uri, ioe); "client: {} object: {}, trying to recover: ",
uri, client, object, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: " + ioe,
uri, client, object);
}
streamStatistics.readException(); streamStatistics.readException();
reopen("failure recovery", pos, length); reopen("failure recovery", pos, length);
} }
@ -450,7 +466,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param forceAbort force an abort; used if explicitly requested. * @param forceAbort force an abort; used if explicitly requested.
*/ */
private void closeStream(String reason, long length, boolean forceAbort) { private void closeStream(String reason, long length, boolean forceAbort) {
if (wrappedStream != null) { if (wrappedStream == null) {
// steam is already closed
return;
}
// if the amount of data remaining in the current request is greater // if the amount of data remaining in the current request is greater
// than the readahead value: abort. // than the readahead value: abort.
@ -458,6 +477,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
LOG.debug("Closing stream {}: {}", reason, LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft"); forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead; boolean shouldAbort = forceAbort || remaining > readahead;
try {
if (!shouldAbort) { if (!shouldAbort) {
try { try {
// clean close. This will read to the end of the stream, // clean close. This will read to the end of the stream,
@ -485,8 +506,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
if (shouldAbort) { if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the // Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream. // remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream"); LOG.debug("Aborting stream {}", uri);
try {
wrappedStream.abort(); wrappedStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining); streamStatistics.streamClose(true, remaining);
} }
LOG.debug("Stream {} {}: {}; remaining={} streamPos={}," LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
@ -496,7 +522,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
remaining, pos, nextReadPos, remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish, contentRangeStart, contentRangeFinish,
length); length);
} finally {
wrappedStream = null; wrappedStream = null;
object = null;
} }
} }