HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma.

(cherry picked from commit 685af8a3d0)
This commit is contained in:
Kihwal Lee 2015-02-17 17:14:58 -06:00 committed by Tsuyoshi Ozawa
parent 9523b52da5
commit 6c01e58619
3 changed files with 72 additions and 0 deletions

View File

@ -550,6 +550,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of
the object from S3. (Dan Hecht via stevel).
HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is
full. (Ming Ma via kihwal)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -663,6 +663,7 @@ public abstract class Server {
assert !running;
readSelector.wakeup();
try {
super.interrupt();
super.join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();

View File

@ -38,9 +38,16 @@ import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -1009,6 +1016,67 @@ public class TestRPC {
}
}
/**
* Verify the RPC server can shutdown properly when callQueue is full.
*/
@Test (timeout=30000)
public void testRPCServerShutdown() throws Exception {
final int numClients = 3;
final List<Future<Void>> res = new ArrayList<Future<Void>>();
final ExecutorService executorService =
Executors.newFixedThreadPool(numClients);
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
.build();
server.start();
final TestProtocol proxy =
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
try {
// start a sleep RPC call to consume the only handler thread.
// Start another sleep RPC call to make callQueue full.
// Start another sleep RPC call to make reader thread block on CallQueue.
for (int i = 0; i < numClients; i++) {
res.add(executorService.submit(
new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
proxy.sleep(100000);
return null;
}
}));
}
while (server.getCallQueueLen() != 1
&& countThreads(CallQueueManager.class.getName()) != 1
&& countThreads(TestProtocol.class.getName()) != 1) {
Thread.sleep(100);
}
} finally {
try {
server.stop();
assertEquals("Not enough clients", numClients, res.size());
for (Future<Void> f : res) {
try {
f.get();
fail("Future get should not return");
} catch (ExecutionException e) {
assertTrue("Unexpected exception: " + e,
e.getCause() instanceof IOException);
LOG.info("Expected exception", e.getCause());
}
}
} finally {
RPC.stopProxy(proxy);
executorService.shutdown();
}
}
}
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);