HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)

Yongjun Zhang <yongjunzhang@pinterest.com>

Change-Id: Ibbc6a39afb82de1208e6ed6a63ede224cc425466
This commit is contained in:
yzhangal 2020-12-18 11:08:10 -08:00 committed by Steve Loughran
parent be508718d8
commit adf6ca18b4
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
1 changed files with 42 additions and 15 deletions

View File

@ -87,6 +87,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* set * set
*/ */
private volatile boolean closed; private volatile boolean closed;
/**
* wrappedStream is associated with an object (instance of S3Object). When
* the object is garbage collected, the associated wrappedStream will be
* closed. Keep a reference to this object to prevent the wrapperStream
* still in use from being closed unexpectedly due to garbage collection.
* See HADOOP-17338 for details.
*/
private S3Object object;
private S3ObjectInputStream wrappedStream; private S3ObjectInputStream wrappedStream;
private final S3AReadOpContext context; private final S3AReadOpContext context;
private final AmazonS3 client; private final AmazonS3 client;
@ -202,7 +210,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
String text = String.format("%s %s at %d", String text = String.format("%s %s at %d",
operation, uri, targetPos); operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request); changeTracker.maybeApplyConstraint(request);
S3Object object = Invoker.once(text, uri, object = Invoker.once(text, uri,
() -> client.getObject(request)); () -> client.getObject(request));
changeTracker.processResponse(object, operation, changeTracker.processResponse(object, operation,
@ -430,9 +438,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
@Retries.OnceTranslated @Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length, boolean forceAbort) private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException { throws IOException {
if (LOG.isDebugEnabled()) {
LOG.info("Got exception while trying to read from stream {}" + LOG.debug("Got exception while trying to read from stream {}, " +
" trying to recover: " + ioe, uri); "client: {} object: {}, trying to recover: ",
uri, client, object, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: " + ioe,
uri, client, object);
}
streamStatistics.readException(); streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort); reopen("failure recovery", pos, length, forceAbort);
} }
@ -550,7 +564,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
@Retries.OnceRaw @Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) { private void closeStream(String reason, long length, boolean forceAbort) {
if (isObjectStreamOpen()) { if (!isObjectStreamOpen()) {
// steam is already closed
return;
}
// if the amount of data remaining in the current request is greater // if the amount of data remaining in the current request is greater
// than the readahead value: abort. // than the readahead value: abort.
@ -558,6 +575,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
LOG.debug("Closing stream {}: {}", reason, LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft"); forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead; boolean shouldAbort = forceAbort || remaining > readahead;
try {
if (!shouldAbort) { if (!shouldAbort) {
try { try {
// clean close. This will read to the end of the stream, // clean close. This will read to the end of the stream,
@ -578,15 +597,21 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.streamClose(false, drained); streamStatistics.streamClose(false, drained);
} catch (Exception e) { } catch (Exception e) {
// exception escalates to an abort // exception escalates to an abort
LOG.debug("When closing {} stream for {}", uri, reason, e); LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true; shouldAbort = true;
} }
} }
if (shouldAbort) { if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the // Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream. // remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream"); LOG.debug("Aborting stream {}", uri);
try {
wrappedStream.abort(); wrappedStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining); streamStatistics.streamClose(true, remaining);
} }
LOG.debug("Stream {} {}: {}; remaining={} streamPos={}," LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
@ -596,7 +621,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
remaining, pos, nextReadPos, remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish, contentRangeStart, contentRangeFinish,
length); length);
} finally {
wrappedStream = null; wrappedStream = null;
object = null;
} }
} }