HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.
(cherry picked from commit 0ebc658105
)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
d09ddb7898
commit
76cfdb1aba
|
@ -41,6 +41,9 @@ Release 2.7.2 - UNRELEASED
|
|||
HADOOP-12006. Remove unimplemented option for `hadoop fs -ls` from
|
||||
document in branch-2.7. (Akira AJISAKA via ozawa)
|
||||
|
||||
HADOOP-12213. Interrupted exception can occur when Client#stop is called.
|
||||
(Kuhu Shukla via ozawa)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -194,9 +194,10 @@ public class Client {
|
|||
clientExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Interrupted while waiting for clientExecutor" +
|
||||
"to stop", e);
|
||||
LOG.warn("Interrupted while waiting for clientExecutor" +
|
||||
" to stop");
|
||||
clientExecutor.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
clientExecutor = null;
|
||||
}
|
||||
|
@ -253,6 +254,10 @@ public class Client {
|
|||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static final ExecutorService getClientExecutor() {
|
||||
return Client.clientExcecutorFactory.clientExecutor;
|
||||
}
|
||||
/**
|
||||
* Increment this client's reference count
|
||||
*
|
||||
|
|
|
@ -278,6 +278,8 @@ public class TestIPC {
|
|||
String causeText=cause.getMessage();
|
||||
assertTrue("Did not find " + causeText + " in " + message,
|
||||
message.contains(causeText));
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -390,6 +392,7 @@ public class TestIPC {
|
|||
client.call(param, addr, null, null, 0, conf);
|
||||
|
||||
} finally {
|
||||
client.stop();
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
@ -505,6 +508,8 @@ public class TestIPC {
|
|||
fail("Expected an exception to have been thrown");
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getMessage().contains("Injected fault"));
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -530,11 +535,11 @@ public class TestIPC {
|
|||
}).when(spyFactory).createSocket();
|
||||
|
||||
Server server = new TestServer(1, true);
|
||||
Client client = new Client(LongWritable.class, conf, spyFactory);
|
||||
server.start();
|
||||
try {
|
||||
// Call should fail due to injected exception.
|
||||
InetSocketAddress address = NetUtils.getConnectAddress(server);
|
||||
Client client = new Client(LongWritable.class, conf, spyFactory);
|
||||
try {
|
||||
client.call(new LongWritable(RANDOM.nextLong()),
|
||||
address, null, null, 0, conf);
|
||||
|
@ -551,6 +556,7 @@ public class TestIPC {
|
|||
client.call(new LongWritable(RANDOM.nextLong()),
|
||||
address, null, null, 0, conf);
|
||||
} finally {
|
||||
client.stop();
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
@ -575,6 +581,7 @@ public class TestIPC {
|
|||
// set timeout to be bigger than 3*ping interval
|
||||
client.call(new LongWritable(RANDOM.nextLong()),
|
||||
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
|
@ -595,6 +602,7 @@ public class TestIPC {
|
|||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("Get a SocketTimeoutException ", e);
|
||||
}
|
||||
client.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -825,6 +833,8 @@ public class TestIPC {
|
|||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
} catch (InterruptedException e) {
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -926,6 +936,31 @@ public class TestIPC {
|
|||
endFds - startFds < 20);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Client is interrupted after handling
|
||||
* InterruptedException during cleanup
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testInterrupted() {
|
||||
Client client = new Client(LongWritable.class, conf);
|
||||
client.getClientExecutor().submit(new Runnable() {
|
||||
public void run() {
|
||||
while(true);
|
||||
}
|
||||
});
|
||||
Thread.currentThread().interrupt();
|
||||
client.stop();
|
||||
try {
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
LOG.info("Expected thread interrupt during client cleanup");
|
||||
} catch (AssertionError e) {
|
||||
LOG.error("The Client did not interrupt after handling an Interrupted Exception");
|
||||
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
|
||||
}
|
||||
// Clear Thread interrupt
|
||||
Thread.currentThread().interrupted();
|
||||
}
|
||||
|
||||
private long countOpenFileDescriptors() {
|
||||
return FD_DIR.list().length;
|
||||
}
|
||||
|
@ -1255,6 +1290,7 @@ public class TestIPC {
|
|||
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
||||
.createSocket();
|
||||
}
|
||||
client.stop();
|
||||
}
|
||||
|
||||
private void doIpcVersionTest(
|
||||
|
|
Loading…
Reference in New Issue