HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.
This commit is contained in:
parent
355eaaa33d
commit
0ebc658105
|
@ -1132,6 +1132,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be
|
HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be
|
||||||
closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa)
|
closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa)
|
||||||
|
|
||||||
|
HADOOP-12213. Interrupted exception can occur when Client#stop is called.
|
||||||
|
(Kuhu Shukla via ozawa)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -197,9 +197,10 @@ public class Client {
|
||||||
clientExecutor.shutdownNow();
|
clientExecutor.shutdownNow();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Interrupted while waiting for clientExecutor" +
|
LOG.warn("Interrupted while waiting for clientExecutor" +
|
||||||
"to stop", e);
|
" to stop");
|
||||||
clientExecutor.shutdownNow();
|
clientExecutor.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
clientExecutor = null;
|
clientExecutor = null;
|
||||||
}
|
}
|
||||||
|
@ -256,6 +257,10 @@ public class Client {
|
||||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final ExecutorService getClientExecutor() {
|
||||||
|
return Client.clientExcecutorFactory.clientExecutor;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Increment this client's reference count
|
* Increment this client's reference count
|
||||||
*
|
*
|
||||||
|
|
|
@ -304,6 +304,8 @@ public class TestIPC {
|
||||||
String causeText=cause.getMessage();
|
String causeText=cause.getMessage();
|
||||||
assertTrue("Did not find " + causeText + " in " + message,
|
assertTrue("Did not find " + causeText + " in " + message,
|
||||||
message.contains(causeText));
|
message.contains(causeText));
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,6 +418,7 @@ public class TestIPC {
|
||||||
client.call(param, addr, null, null, 0, conf);
|
client.call(param, addr, null, null, 0, conf);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -531,6 +534,8 @@ public class TestIPC {
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertTrue(e.getMessage().contains("Injected fault"));
|
assertTrue(e.getMessage().contains("Injected fault"));
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -556,11 +561,11 @@ public class TestIPC {
|
||||||
}).when(spyFactory).createSocket();
|
}).when(spyFactory).createSocket();
|
||||||
|
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
|
Client client = new Client(LongWritable.class, conf, spyFactory);
|
||||||
server.start();
|
server.start();
|
||||||
try {
|
try {
|
||||||
// Call should fail due to injected exception.
|
// Call should fail due to injected exception.
|
||||||
InetSocketAddress address = NetUtils.getConnectAddress(server);
|
InetSocketAddress address = NetUtils.getConnectAddress(server);
|
||||||
Client client = new Client(LongWritable.class, conf, spyFactory);
|
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
address, null, null, 0, conf);
|
address, null, null, 0, conf);
|
||||||
|
@ -577,6 +582,7 @@ public class TestIPC {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
address, null, null, 0, conf);
|
address, null, null, 0, conf);
|
||||||
} finally {
|
} finally {
|
||||||
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -601,6 +607,7 @@ public class TestIPC {
|
||||||
// set timeout to be bigger than 3*ping interval
|
// set timeout to be bigger than 3*ping interval
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
@ -621,6 +628,7 @@ public class TestIPC {
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
LOG.info("Get a SocketTimeoutException ", e);
|
LOG.info("Get a SocketTimeoutException ", e);
|
||||||
}
|
}
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -851,6 +859,8 @@ public class TestIPC {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(e);
|
LOG.error(e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -952,6 +962,31 @@ public class TestIPC {
|
||||||
endFds - startFds < 20);
|
endFds - startFds < 20);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if Client is interrupted after handling
|
||||||
|
* InterruptedException during cleanup
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testInterrupted() {
|
||||||
|
Client client = new Client(LongWritable.class, conf);
|
||||||
|
client.getClientExecutor().submit(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
while(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
client.stop();
|
||||||
|
try {
|
||||||
|
assertTrue(Thread.currentThread().isInterrupted());
|
||||||
|
LOG.info("Expected thread interrupt during client cleanup");
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
LOG.error("The Client did not interrupt after handling an Interrupted Exception");
|
||||||
|
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
|
||||||
|
}
|
||||||
|
// Clear Thread interrupt
|
||||||
|
Thread.currentThread().interrupted();
|
||||||
|
}
|
||||||
|
|
||||||
private long countOpenFileDescriptors() {
|
private long countOpenFileDescriptors() {
|
||||||
return FD_DIR.list().length;
|
return FD_DIR.list().length;
|
||||||
}
|
}
|
||||||
|
@ -1315,6 +1350,7 @@ public class TestIPC {
|
||||||
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
||||||
.createSocket();
|
.createSocket();
|
||||||
}
|
}
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doIpcVersionTest(
|
private void doIpcVersionTest(
|
||||||
|
|
Loading…
Reference in New Issue