HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma.
This commit is contained in:
parent
6dc8812a95
commit
685af8a3d0
|
@ -953,6 +953,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of
|
||||
the object from S3. (Dan Hecht via stevel).
|
||||
|
||||
HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is
|
||||
full. (Ming Ma via kihwal)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -666,6 +666,7 @@ public abstract class Server {
|
|||
assert !running;
|
||||
readSelector.wakeup();
|
||||
try {
|
||||
super.interrupt();
|
||||
super.join();
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
@ -38,9 +38,16 @@ import java.lang.reflect.Proxy;
|
|||
import java.net.ConnectException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -1013,6 +1020,67 @@ public class TestRPC {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the RPC server can shutdown properly when callQueue is full.
|
||||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testRPCServerShutdown() throws Exception {
|
||||
final int numClients = 3;
|
||||
final List<Future<Void>> res = new ArrayList<Future<Void>>();
|
||||
final ExecutorService executorService =
|
||||
Executors.newFixedThreadPool(numClients);
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||
final Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||
.setBindAddress(ADDRESS).setPort(0)
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||
.build();
|
||||
server.start();
|
||||
|
||||
final TestProtocol proxy =
|
||||
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||
NetUtils.getConnectAddress(server), conf);
|
||||
try {
|
||||
// start a sleep RPC call to consume the only handler thread.
|
||||
// Start another sleep RPC call to make callQueue full.
|
||||
// Start another sleep RPC call to make reader thread block on CallQueue.
|
||||
for (int i = 0; i < numClients; i++) {
|
||||
res.add(executorService.submit(
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException, InterruptedException {
|
||||
proxy.sleep(100000);
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
}
|
||||
while (server.getCallQueueLen() != 1
|
||||
&& countThreads(CallQueueManager.class.getName()) != 1
|
||||
&& countThreads(TestProtocol.class.getName()) != 1) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
server.stop();
|
||||
assertEquals("Not enough clients", numClients, res.size());
|
||||
for (Future<Void> f : res) {
|
||||
try {
|
||||
f.get();
|
||||
fail("Future get should not return");
|
||||
} catch (ExecutionException e) {
|
||||
assertTrue("Unexpected exception: " + e,
|
||||
e.getCause() instanceof IOException);
|
||||
LOG.info("Expected exception", e.getCause());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
RPC.stopProxy(proxy);
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
||||
|
|
Loading…
Reference in New Issue