From caf38532f3f3eafb4c874a6debddaad2fb2aa201 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 11 Jul 2018 14:55:11 +0100 Subject: [PATCH] HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting connection in case of timeout. Contributed by Sean Mackrory. (cherry picked from commit d503f65b6689b19278ec2a0cf9da5a8762539de8) --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c54d3e26217..91a2d9d1ac2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -36,6 +36,7 @@ import java.io.EOFException; import java.io.IOException; +import java.net.SocketTimeoutException; import static org.apache.commons.lang3.StringUtils.isNotEmpty; @@ -155,11 +156,11 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { * @throws IOException on any failure to open the object */ @Retries.OnceTranslated - private synchronized void reopen(String reason, long targetPos, long length) - throws IOException { + private synchronized void reopen(String reason, long targetPos, long length, + boolean forceAbort) throws IOException { if (wrappedStream != null) { - closeStream("reopen(" + reason + ")", contentRangeFinish, false); + closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort); } contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, @@ -324,7 +325,7 @@ private void lazySeek(long targetPos, long len) throws IOException { //re-open at specific location if needed if (wrappedStream == null) { - reopen("read from new offset", targetPos, len); + reopen("read from new offset", targetPos, len, false); } }); } @@ -367,8 +368,11 @@ public synchronized int read() throws IOException { b = wrappedStream.read(); } catch (EOFException e) { return -1; + } catch (SocketTimeoutException e) { + onReadFailure(e, 1, true); + b = wrappedStream.read(); } catch (IOException e) { - onReadFailure(e, 1); + onReadFailure(e, 1, false); b = wrappedStream.read(); } return b; @@ -393,12 +397,13 @@ public synchronized int read() throws IOException { * @throws IOException any exception thrown on the re-open attempt. */ @Retries.OnceTranslated - private void onReadFailure(IOException ioe, int length) throws IOException { + private void onReadFailure(IOException ioe, int length, boolean forceAbort) + throws IOException { LOG.info("Got exception while trying to read from stream {}" + " trying to recover: " + ioe, uri); streamStatistics.readException(); - reopen("failure recovery", pos, length); + reopen("failure recovery", pos, length, forceAbort); } /** @@ -446,8 +451,11 @@ public synchronized int read(byte[] buf, int off, int len) } catch (EOFException e) { // the base implementation swallows EOFs. return -1; + } catch (SocketTimeoutException e) { + onReadFailure(e, len, true); + bytes = wrappedStream.read(buf, off, len); } catch (IOException e) { - onReadFailure(e, len); + onReadFailure(e, len, false); bytes= wrappedStream.read(buf, off, len); } return bytes;