HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.

(cherry picked from commit 0ebc658105)
This commit is contained in:
Tsuyoshi Ozawa 2015-09-03 23:32:42 +09:00
parent 7f1abf0b27
commit 538a4ddc7e
3 changed files with 47 additions and 3 deletions

View File

@ -635,6 +635,9 @@ Release 2.7.2 - UNRELEASED
HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be
closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa) closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa)
HADOOP-12213. Interrupted exception can occur when Client#stop is called.
(Kuhu Shukla via ozawa)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -197,9 +197,10 @@ public class Client {
clientExecutor.shutdownNow(); clientExecutor.shutdownNow();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Interrupted while waiting for clientExecutor" + LOG.warn("Interrupted while waiting for clientExecutor" +
"to stop", e); " to stop");
clientExecutor.shutdownNow(); clientExecutor.shutdownNow();
Thread.currentThread().interrupt();
} }
clientExecutor = null; clientExecutor = null;
} }
@ -256,6 +257,10 @@ public class Client {
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
} }
@VisibleForTesting
public static final ExecutorService getClientExecutor() {
return Client.clientExcecutorFactory.clientExecutor;
}
/** /**
* Increment this client's reference count * Increment this client's reference count
* *

View File

@ -304,6 +304,8 @@ public class TestIPC {
String causeText=cause.getMessage(); String causeText=cause.getMessage();
assertTrue("Did not find " + causeText + " in " + message, assertTrue("Did not find " + causeText + " in " + message,
message.contains(causeText)); message.contains(causeText));
} finally {
client.stop();
} }
} }
@ -416,6 +418,7 @@ public class TestIPC {
client.call(param, addr, null, null, 0, conf); client.call(param, addr, null, null, 0, conf);
} finally { } finally {
client.stop();
server.stop(); server.stop();
} }
} }
@ -531,6 +534,8 @@ public class TestIPC {
fail("Expected an exception to have been thrown"); fail("Expected an exception to have been thrown");
} catch (IOException e) { } catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault")); assertTrue(e.getMessage().contains("Injected fault"));
} finally {
client.stop();
} }
} }
@ -556,11 +561,11 @@ public class TestIPC {
}).when(spyFactory).createSocket(); }).when(spyFactory).createSocket();
Server server = new TestServer(1, true); Server server = new TestServer(1, true);
Client client = new Client(LongWritable.class, conf, spyFactory);
server.start(); server.start();
try { try {
// Call should fail due to injected exception. // Call should fail due to injected exception.
InetSocketAddress address = NetUtils.getConnectAddress(server); InetSocketAddress address = NetUtils.getConnectAddress(server);
Client client = new Client(LongWritable.class, conf, spyFactory);
try { try {
client.call(new LongWritable(RANDOM.nextLong()), client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf); address, null, null, 0, conf);
@ -577,6 +582,7 @@ public class TestIPC {
client.call(new LongWritable(RANDOM.nextLong()), client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf); address, null, null, 0, conf);
} finally { } finally {
client.stop();
server.stop(); server.stop();
} }
} }
@ -601,6 +607,7 @@ public class TestIPC {
// set timeout to be bigger than 3*ping interval // set timeout to be bigger than 3*ping interval
client.call(new LongWritable(RANDOM.nextLong()), client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
client.stop();
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -621,6 +628,7 @@ public class TestIPC {
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e); LOG.info("Get a SocketTimeoutException ", e);
} }
client.stop();
} }
/** /**
@ -851,6 +859,8 @@ public class TestIPC {
} catch (IOException e) { } catch (IOException e) {
LOG.error(e); LOG.error(e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} finally {
client.stop();
} }
} }
}); });
@ -952,6 +962,31 @@ public class TestIPC {
endFds - startFds < 20); endFds - startFds < 20);
} }
/**
* Check if Client is interrupted after handling
* InterruptedException during cleanup
*/
@Test(timeout=30000)
public void testInterrupted() {
Client client = new Client(LongWritable.class, conf);
client.getClientExecutor().submit(new Runnable() {
public void run() {
while(true);
}
});
Thread.currentThread().interrupt();
client.stop();
try {
assertTrue(Thread.currentThread().isInterrupted());
LOG.info("Expected thread interrupt during client cleanup");
} catch (AssertionError e) {
LOG.error("The Client did not interrupt after handling an Interrupted Exception");
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
}
// Clear Thread interrupt
Thread.currentThread().interrupted();
}
private long countOpenFileDescriptors() { private long countOpenFileDescriptors() {
return FD_DIR.list().length; return FD_DIR.list().length;
} }
@ -1315,6 +1350,7 @@ public class TestIPC {
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries)) Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
.createSocket(); .createSocket();
} }
client.stop();
} }
private void doIpcVersionTest( private void doIpcVersionTest(