diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index d21c8073cf1..57f0b7c2149 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -18,10 +18,10 @@ package org.apache.hadoop.ipc; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; @@ -149,73 +149,6 @@ public class Client implements AutoCloseable { private final int maxAsyncCalls; private final AtomicInteger asyncCallCounter = new AtomicInteger(0); - /** - * Executor on which IPC calls' parameters are sent. - * Deferring the sending of parameters to a separate - * thread isolates them from thread interruptions in the - * calling code. - */ - private final ExecutorService sendParamsExecutor; - private final static ClientExecutorServiceFactory clientExcecutorFactory = - new ClientExecutorServiceFactory(); - - private static class ClientExecutorServiceFactory { - private int executorRefCount = 0; - private ExecutorService clientExecutor = null; - - /** - * Get Executor on which IPC calls' parameters are sent. - * If the internal reference counter is zero, this method - * creates the instance of Executor. If not, this method - * just returns the reference of clientExecutor. - * - * @return An ExecutorService instance - */ - synchronized ExecutorService refAndGetInstance() { - if (executorRefCount == 0) { - clientExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("IPC Parameter Sending Thread #%d") - .build()); - } - executorRefCount++; - - return clientExecutor; - } - - /** - * Cleanup Executor on which IPC calls' parameters are sent. - * If reference counter is zero, this method discards the - * instance of the Executor. If not, this method - * just decrements the internal reference counter. - * - * @return An ExecutorService instance if it exists. - * Null is returned if not. - */ - synchronized ExecutorService unrefAndCleanup() { - executorRefCount--; - assert(executorRefCount >= 0); - - if (executorRefCount == 0) { - clientExecutor.shutdown(); - try { - if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) { - clientExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for clientExecutor" + - " to stop"); - clientExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - clientExecutor = null; - } - - return clientExecutor; - } - } - /** * set the ping interval value in configuration * @@ -284,11 +217,6 @@ public class Client implements AutoCloseable { conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout); } - @VisibleForTesting - public static final ExecutorService getClientExecutor() { - return Client.clientExcecutorFactory.clientExecutor; - } - /** * Increment this client's reference count */ @@ -445,8 +373,10 @@ public class Client implements AutoCloseable { private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason - - private final Object sendRpcRequestLock = new Object(); + + private final Thread rpcRequestThread; + private final SynchronousQueue> rpcRequestQueue = + new SynchronousQueue<>(true); private AtomicReference connectingThread = new AtomicReference<>(); private final Consumer removeMethod; @@ -455,6 +385,9 @@ public class Client implements AutoCloseable { Consumer removeMethod) { this.remoteId = remoteId; this.server = remoteId.getAddress(); + this.rpcRequestThread = new Thread(new RpcRequestSender(), + "IPC Parameter Sending Thread for " + remoteId); + this.rpcRequestThread.setDaemon(true); this.maxResponseLength = remoteId.conf.getInt( CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, @@ -1133,6 +1066,10 @@ public class Client implements AutoCloseable { @Override public void run() { + // Don't start the ipc parameter sending thread until we start this + // thread, because the shutdown logic only gets triggered if this + // thread is started. + rpcRequestThread.start(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); @@ -1156,9 +1093,52 @@ public class Client implements AutoCloseable { + connections.size()); } + /** + * A thread to write rpc requests to the socket. + */ + private class RpcRequestSender implements Runnable { + @Override + public void run() { + while (!shouldCloseConnection.get()) { + ResponseBuffer buf = null; + try { + Pair pair = + rpcRequestQueue.poll(maxIdleTime, TimeUnit.MILLISECONDS); + if (pair == null || shouldCloseConnection.get()) { + continue; + } + buf = pair.getRight(); + synchronized (ipcStreams.out) { + if (LOG.isDebugEnabled()) { + Call call = pair.getLeft(); + LOG.debug(getName() + "{} sending #{} {}", getName(), call.id, + call.rpcRequest); + } + // RpcRequestHeader + RpcRequest + ipcStreams.sendRequest(buf.toByteArray()); + ipcStreams.flush(); + } + } catch (InterruptedException ie) { + // stop this thread + return; + } catch (IOException e) { + // exception at this point would leave the connection in an + // unrecoverable state (eg half a call left on the wire). + // So, close the connection, killing any outstanding calls + markClosed(e); + } finally { + //the buffer is just an in-memory buffer, but it is still polite to + // close early + IOUtils.closeStream(buf); + } + } + } + } + /** Initiates a rpc call by sending the rpc request to the remote server. - * Note: this is not called from the Connection thread, but by other - * threads. + * Note: this is not called from the current thread, but by another + * thread, so that if the current thread is interrupted that the socket + * state isn't corrupted with a partially written message. * @param call - the rpc request */ public void sendRpcRequest(final Call call) @@ -1168,8 +1148,7 @@ public class Client implements AutoCloseable { } // Serialize the call to be sent. This is done from the actual - // caller thread, rather than the sendParamsExecutor thread, - + // caller thread, rather than the rpcRequestThread in the connection, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // @@ -1186,51 +1165,7 @@ public class Client implements AutoCloseable { final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); RpcWritable.wrap(call.rpcRequest).writeTo(buf); - - synchronized (sendRpcRequestLock) { - Future senderFuture = sendParamsExecutor.submit(new Runnable() { - @Override - public void run() { - try { - synchronized (ipcStreams.out) { - if (shouldCloseConnection.get()) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + " sending #" + call.id - + " " + call.rpcRequest); - } - // RpcRequestHeader + RpcRequest - ipcStreams.sendRequest(buf.toByteArray()); - ipcStreams.flush(); - } - } catch (IOException e) { - // exception at this point would leave the connection in an - // unrecoverable state (eg half a call left on the wire). - // So, close the connection, killing any outstanding calls - markClosed(e); - } finally { - //the buffer is just an in-memory buffer, but it is still polite to - // close early - IOUtils.closeStream(buf); - } - } - }); - - try { - senderFuture.get(); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - - // cause should only be a RuntimeException as the Runnable above - // catches IOException - if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw new RuntimeException("unexpected checked exception", cause); - } - } - } + rpcRequestQueue.put(Pair.of(call, buf)); } /* Receive a response. @@ -1379,7 +1314,6 @@ public class Client implements AutoCloseable { CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT); this.clientId = ClientId.getClientId(); - this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); @@ -1423,6 +1357,7 @@ public class Client implements AutoCloseable { // wake up all connections for (Connection conn : connections.values()) { conn.interrupt(); + conn.rpcRequestThread.interrupt(); conn.interruptConnectingThread(); } @@ -1439,7 +1374,6 @@ public class Client implements AutoCloseable { } } } - clientExcecutorFactory.unrefAndCleanup(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 05ef09ae433..b9afa31b448 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -1218,11 +1218,6 @@ public class TestIPC { @Test(timeout=30000) public void testInterrupted() { Client client = new Client(LongWritable.class, conf); - Client.getClientExecutor().submit(new Runnable() { - public void run() { - while(true); - } - }); Thread.currentThread().interrupt(); client.stop(); try { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 85b0a7b6c84..362b8e3105e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -57,6 +57,7 @@ import org.apache.hadoop.test.Whitebox; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -64,13 +65,16 @@ import org.slf4j.event.Level; import javax.net.SocketFactory; import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; @@ -91,6 +95,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; @@ -995,6 +1000,196 @@ public class TestRPC extends TestRpcBase { } } + /** + * This tests the case where the server isn't receiving new data and + * multiple threads queue up to send rpc requests. Only one of the requests + * should be written and all of the calling threads should be interrupted. + * + * We use a mock SocketFactory so that we can control when the input and + * output streams are frozen. + */ + @Test(timeout=30000) + public void testSlowConnection() throws Exception { + SocketFactory mockFactory = Mockito.mock(SocketFactory.class); + Socket mockSocket = Mockito.mock(Socket.class); + Mockito.when(mockFactory.createSocket()).thenReturn(mockSocket); + Mockito.when(mockSocket.getPort()).thenReturn(1234); + Mockito.when(mockSocket.getLocalPort()).thenReturn(2345); + MockOutputStream mockOutputStream = new MockOutputStream(); + Mockito.when(mockSocket.getOutputStream()).thenReturn(mockOutputStream); + // Use an input stream that always blocks + Mockito.when(mockSocket.getInputStream()).thenReturn(new InputStream() { + @Override + public int read() throws IOException { + // wait forever + while (true) { + try { + Thread.sleep(TimeUnit.DAYS.toMillis(1)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("test"); + } + } + } + }); + Configuration clientConf = new Configuration(); + // disable ping & timeout to minimize traffic + clientConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); + clientConf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 0); + RPC.setProtocolEngine(clientConf, TestRpcService.class, ProtobufRpcEngine.class); + // set async mode so that we don't need to implement the input stream + final boolean wasAsync = Client.isAsynchronousMode(); + TestRpcService client = null; + try { + Client.setAsynchronousMode(true); + client = RPC.getProtocolProxy( + TestRpcService.class, + 0, + new InetSocketAddress("localhost", 1234), + UserGroupInformation.getCurrentUser(), + clientConf, + mockFactory).getProxy(); + // The connection isn't actually made until the first call. + client.ping(null, newEmptyRequest()); + mockOutputStream.waitForFlush(1); + final long headerAndFirst = mockOutputStream.getBytesWritten(); + client.ping(null, newEmptyRequest()); + mockOutputStream.waitForFlush(2); + final long second = mockOutputStream.getBytesWritten() - headerAndFirst; + // pause the writer thread + mockOutputStream.pause(); + // create a set of threads to create calls that will back up + ExecutorService pool = Executors.newCachedThreadPool(); + Future[] futures = new Future[numThreads]; + final AtomicInteger doneThreads = new AtomicInteger(0); + for(int thread = 0; thread < numThreads; ++thread) { + final TestRpcService finalClient = client; + futures[thread] = pool.submit(new Callable() { + @Override + public Void call() throws Exception { + finalClient.ping(null, newEmptyRequest()); + doneThreads.incrementAndGet(); + return null; + } + }); + } + // wait until the threads have started writing + mockOutputStream.waitForWriters(); + // interrupt all the threads + for(int thread=0; thread < numThreads; ++thread) { + assertTrue("cancel thread " + thread, + futures[thread].cancel(true)); + } + // wait until all the writers are cancelled + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + mockOutputStream.resume(); + // wait for the in flight rpc request to be flushed + mockOutputStream.waitForFlush(3); + // All the threads should have been interrupted + assertEquals(0, doneThreads.get()); + // make sure that only one additional rpc request was sent + assertEquals(headerAndFirst + second * 2, + mockOutputStream.getBytesWritten()); + } finally { + Client.setAsynchronousMode(wasAsync); + if (client != null) { + RPC.stopProxy(client); + } + } + } + + private static final class MockOutputStream extends OutputStream { + private long bytesWritten = 0; + private AtomicInteger flushCount = new AtomicInteger(0); + private ReentrantLock lock = new ReentrantLock(true); + + @Override + public synchronized void write(int b) throws IOException { + lock.lock(); + bytesWritten += 1; + lock.unlock(); + } + + @Override + public void flush() { + flushCount.incrementAndGet(); + } + + public synchronized long getBytesWritten() { + return bytesWritten; + } + + public void pause() { + lock.lock(); + } + + public void resume() { + lock.unlock(); + } + + private static final int DELAY_MS = 250; + + /** + * Wait for the Nth flush, which we assume will happen exactly when the + * Nth RPC request is sent. + * @param flush the total flush count to wait for + * @throws InterruptedException + */ + public void waitForFlush(int flush) throws InterruptedException { + while (flushCount.get() < flush) { + Thread.sleep(DELAY_MS); + } + } + + public void waitForWriters() throws InterruptedException { + while (!lock.hasQueuedThreads()) { + Thread.sleep(DELAY_MS); + } + } + } + + /** + * This test causes an exception in the RPC connection setup to make + * sure that threads aren't leaked. + */ + @Test(timeout=30000) + public void testBadSetup() throws Exception { + SocketFactory mockFactory = Mockito.mock(SocketFactory.class); + Mockito.when(mockFactory.createSocket()) + .thenThrow(new IOException("can't connect")); + Configuration clientConf = new Configuration(); + // Set an illegal value to cause an exception in the constructor + clientConf.set(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, + "xxx"); + RPC.setProtocolEngine(clientConf, TestRpcService.class, + ProtobufRpcEngine.class); + TestRpcService client = null; + int threadCount = Thread.getAllStackTraces().size(); + try { + try { + client = RPC.getProtocolProxy( + TestRpcService.class, + 0, + new InetSocketAddress("localhost", 1234), + UserGroupInformation.getCurrentUser(), + clientConf, + mockFactory).getProxy(); + client.ping(null, newEmptyRequest()); + assertTrue("Didn't throw exception!", false); + } catch (ServiceException nfe) { + // ensure no extra threads are running. + assertEquals(threadCount, Thread.getAllStackTraces().size()); + } catch (Throwable t) { + assertTrue("wrong exception: " + t, false); + } + } finally { + if (client != null) { + RPC.stopProxy(client); + } + } + } + @Test public void testConnectionPing() throws Exception { Server server;