From 975f669141add885d631685d693e81c44c190751 Mon Sep 17 00:00:00 2001 From: Xudong Cao Date: Wed, 13 Nov 2019 06:19:39 +0800 Subject: [PATCH] HADOOP-16677. Recalculate the remaining timeout millis correctly while throwing an InterupptedException in SocketIOWithTimeout. (#1687) (cherry picked from commit df6b3162c11987ba5299c69cb251332228dacf36) --- .../hadoop/net/SocketIOWithTimeout.java | 24 ++++++------ .../hadoop/net/TestSocketIOWithTimeout.java | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java index f489581843f..312a481f25a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java @@ -326,34 +326,36 @@ abstract class SocketIOWithTimeout { SelectionKey key = null; int ret = 0; + long timeoutLeft = timeout; try { while (true) { long start = (timeout == 0) ? 0 : Time.now(); key = channel.register(info.selector, ops); - ret = info.selector.select(timeout); + ret = info.selector.select(timeoutLeft); if (ret != 0) { return ret; } - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedIOException("Interrupted while waiting for " - + "IO on channel " + channel + ". " + timeout - + " millis timeout left."); - } - /* Sometimes select() returns 0 much before timeout for * unknown reasons. So select again if required. */ if (timeout > 0) { - timeout -= Time.now() - start; - if (timeout <= 0) { - return 0; - } + timeoutLeft -= Time.now() - start; + timeoutLeft = Math.max(0, timeoutLeft); } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedIOException("Interrupted while waiting for " + + "IO on channel " + channel + ". Total timeout mills is " + + timeout + ", " + timeoutLeft + " millis timeout left."); + } + + if (timeoutLeft == 0) { + return 0; + } } } finally { if (key != null) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java index f1c03cf5df4..272eae70bb6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java @@ -185,4 +185,42 @@ public class TestSocketIOWithTimeout { } } } + + @Test + public void testSocketIOWithTimeoutInterrupted() throws Exception { + Pipe pipe = Pipe.open(); + final int timeout = TIMEOUT * 10; + + try (Pipe.SourceChannel source = pipe.source(); + InputStream in = new SocketInputStream(source, timeout)) { + + TestingThread thread = new TestingThread(ctx) { + @Override + public void doWork() throws Exception { + try { + in.read(); + fail("Did not fail with interrupt"); + } catch (InterruptedIOException ste) { + String detail = ste.getMessage(); + String totalString = "Total timeout mills is " + timeout; + String leftString = "millis timeout left"; + + assertTrue(detail.contains(totalString)); + assertTrue(detail.contains(leftString)); + } + } + }; + + ctx.addThread(thread); + ctx.startThreads(); + // If the thread is interrupted before it calls read() + // then it throws ClosedByInterruptException due to + // some Java quirk. Waiting for it to call read() + // gets it into select(), so we get the expected + // InterruptedIOException. + Thread.sleep(1000); + thread.interrupt(); + ctx.stop(); + } + } }