HADOOP-16677. Recalculate the remaining timeout millis correctly while throwing an InterupptedException in SocketIOWithTimeout. (#1687)

This commit is contained in:
Xudong Cao 2019-11-13 06:19:39 +08:00 committed by Wei-Chiu Chuang
parent 97ec34e117
commit df6b3162c1
2 changed files with 51 additions and 11 deletions

View File

@ -326,34 +326,36 @@ abstract class SocketIOWithTimeout {
SelectionKey key = null; SelectionKey key = null;
int ret = 0; int ret = 0;
long timeoutLeft = timeout;
try { try {
while (true) { while (true) {
long start = (timeout == 0) ? 0 : Time.now(); long start = (timeout == 0) ? 0 : Time.now();
key = channel.register(info.selector, ops); key = channel.register(info.selector, ops);
ret = info.selector.select(timeout); ret = info.selector.select(timeoutLeft);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("Interrupted while waiting for "
+ "IO on channel " + channel + ". " + timeout
+ " millis timeout left.");
}
/* Sometimes select() returns 0 much before timeout for /* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required. * unknown reasons. So select again if required.
*/ */
if (timeout > 0) { if (timeout > 0) {
timeout -= Time.now() - start; timeoutLeft -= Time.now() - start;
if (timeout <= 0) { timeoutLeft = Math.max(0, timeoutLeft);
return 0;
}
} }
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("Interrupted while waiting for "
+ "IO on channel " + channel + ". Total timeout mills is "
+ timeout + ", " + timeoutLeft + " millis timeout left.");
}
if (timeoutLeft == 0) {
return 0;
}
} }
} finally { } finally {
if (key != null) { if (key != null) {

View File

@ -185,4 +185,42 @@ public class TestSocketIOWithTimeout {
} }
} }
} }
@Test
public void testSocketIOWithTimeoutInterrupted() throws Exception {
Pipe pipe = Pipe.open();
final int timeout = TIMEOUT * 10;
try (Pipe.SourceChannel source = pipe.source();
InputStream in = new SocketInputStream(source, timeout)) {
TestingThread thread = new TestingThread(ctx) {
@Override
public void doWork() throws Exception {
try {
in.read();
fail("Did not fail with interrupt");
} catch (InterruptedIOException ste) {
String detail = ste.getMessage();
String totalString = "Total timeout mills is " + timeout;
String leftString = "millis timeout left";
assertTrue(detail.contains(totalString));
assertTrue(detail.contains(leftString));
}
}
};
ctx.addThread(thread);
ctx.startThreads();
// If the thread is interrupted before it calls read()
// then it throws ClosedByInterruptException due to
// some Java quirk. Waiting for it to call read()
// gets it into select(), so we get the expected
// InterruptedIOException.
Thread.sleep(1000);
thread.interrupt();
ctx.stop();
}
}
} }