diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bcd337a2e77..14f0c6db07e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -473,6 +473,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm) + HADOOP-6762. Exception while doing RPC I/O closes channel + (Sam Rash and todd via todd) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 9bea3db4ef1..29c30bc5b43 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -38,6 +38,11 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -78,6 +83,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -103,6 +110,19 @@ public class Client { final static int PING_CALL_ID = -1; + /** + * Executor on which IPC calls' parameters are sent. Deferring + * the sending of parameters to a separate thread isolates them + * from thread interruptions in the calling code. + */ + private static final ExecutorService SEND_PARAMS_EXECUTOR = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("IPC Parameter Sending Thread #%d") + .build()); + + /** * set the ping interval value in configuration * @@ -245,6 +265,8 @@ private class Connection extends Thread { private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason + + private final Object sendParamsLock = new Object(); public Connection(ConnectionId remoteId) throws IOException { this.remoteId = remoteId; @@ -831,43 +853,76 @@ public void run() { * Note: this is not called from the Connection thread, but by other * threads. */ - public void sendParam(Call call) { + public void sendParam(final Call call) + throws InterruptedException, IOException { if (shouldCloseConnection.get()) { return; } - DataOutputBuffer d=null; - try { - synchronized (this.out) { - if (LOG.isDebugEnabled()) - LOG.debug(getName() + " sending #" + call.id); + // Serialize the call to be sent. This is done from the actual + // caller thread, rather than the SEND_PARAMS_EXECUTOR thread, + // so that if the serialization throws an error, it is reported + // properly. This also parallelizes the serialization. + // + // Format of a call on the wire: + // 0) Length of rest below (1 + 2) + // 1) PayloadHeader - is serialized Delimited hence contains length + // 2) the Payload - the RpcRequest + // + // Items '1' and '2' are prepared here. + final DataOutputBuffer d = new DataOutputBuffer(); + RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader( + call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id); + header.writeDelimitedTo(d); + call.rpcRequest.write(d); + + synchronized (sendParamsLock) { + Future senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() { + @Override + public void run() { + try { + synchronized (Connection.this.out) { + if (shouldCloseConnection.get()) { + return; + } + + if (LOG.isDebugEnabled()) + LOG.debug(getName() + " sending #" + call.id); + + byte[] data = d.getData(); + int totalLength = d.getLength(); + out.writeInt(totalLength); // Total Length + out.write(data, 0, totalLength);//PayloadHeader + RpcRequest + out.flush(); + } + } catch (IOException e) { + // exception at this point would leave the connection in an + // unrecoverable state (eg half a call left on the wire). + // So, close the connection, killing any outstanding calls + markClosed(e); + } finally { + //the buffer is just an in-memory buffer, but it is still polite to + // close early + IOUtils.closeStream(d); + } + } + }); + + try { + senderFuture.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); - // Serializing the data to be written. - // Format: - // 0) Length of rest below (1 + 2) - // 1) PayloadHeader - is serialized Delimited hence contains length - // 2) the Payload - the RpcRequest - // - d = new DataOutputBuffer(); - RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader( - call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id); - header.writeDelimitedTo(d); - call.rpcRequest.write(d); - byte[] data = d.getData(); - - int totalLength = d.getLength(); - out.writeInt(totalLength); // Total Length - out.write(data, 0, totalLength);//PayloadHeader + RpcRequest - out.flush(); + // cause should only be a RuntimeException as the Runnable above + // catches IOException + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new RuntimeException("unexpected checked exception", cause); + } } - } catch(IOException e) { - markClosed(e); - } finally { - //the buffer is just an in-memory buffer, but it is still polite to - // close early - IOUtils.closeStream(d); } - } + } /* Receive a response. * Because only one receiver, so no synchronization on in. @@ -1138,7 +1193,16 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call); - connection.sendParam(call); // send the parameter + try { + connection.sendParam(call); // send the parameter + } catch (RejectedExecutionException e) { + throw new IOException("connection has been closed", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("interrupted waiting to send params to server", e); + throw new IOException(e); + } + boolean interrupted = false; synchronized (call) { while (!call.done) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index a0d6de0e9a8..acbf32b021a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -68,6 +68,7 @@ public class TestIPC { * of the various writables. **/ static boolean WRITABLE_FAULTS_ENABLED = true; + static int WRITABLE_FAULTS_SLEEP = 0; static { Client.setPingInterval(conf, PING_INTERVAL); @@ -206,16 +207,27 @@ public void testStandAloneClient() throws Exception { static void maybeThrowIOE() throws IOException { if (WRITABLE_FAULTS_ENABLED) { + maybeSleep(); throw new IOException("Injected fault"); } } static void maybeThrowRTE() { if (WRITABLE_FAULTS_ENABLED) { + maybeSleep(); throw new RuntimeException("Injected fault"); } } + private static void maybeSleep() { + if (WRITABLE_FAULTS_SLEEP > 0) { + try { + Thread.sleep(WRITABLE_FAULTS_SLEEP); + } catch (InterruptedException ie) { + } + } + } + @SuppressWarnings("unused") private static class IOEOnReadWritable extends LongWritable { public IOEOnReadWritable() {} @@ -370,6 +382,27 @@ public void testRTEOnClientReadResponse() throws Exception { RTEOnReadWritable.class); } + /** + * Test case that fails a write, but only after taking enough time + * that a ping should have been sent. This is a reproducer for a + * deadlock seen in one iteration of HADOOP-6762. + */ + @Test + public void testIOEOnWriteAfterPingClient() throws Exception { + // start server + Client.setPingInterval(conf, 100); + + try { + WRITABLE_FAULTS_SLEEP = 1000; + doErrorTest(IOEOnWriteWritable.class, + LongWritable.class, + LongWritable.class, + LongWritable.class); + } finally { + WRITABLE_FAULTS_SLEEP = 0; + } + } + private static void assertExceptionContains( Throwable t, String substring) { String msg = StringUtils.stringifyException(t); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 745eb792842..a4e915a30a3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -38,6 +38,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; @@ -823,6 +827,96 @@ public void testRPCBuilder() throws Exception { } } + @Test(timeout=90000) + public void testRPCInterruptedSimple() throws Exception { + final Configuration conf = new Configuration(); + Server server = RPC.getServer( + TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null + ); + server.start(); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + + final TestProtocol proxy = (TestProtocol) RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + // Connect to the server + proxy.ping(); + // Interrupt self, try another call + Thread.currentThread().interrupt(); + try { + proxy.ping(); + fail("Interruption did not cause IPC to fail"); + } catch (IOException ioe) { + if (!ioe.toString().contains("InterruptedException")) { + throw ioe; + } + // clear interrupt status for future tests + Thread.interrupted(); + } + } + + @Test(timeout=30000) + public void testRPCInterrupted() throws IOException, InterruptedException { + final Configuration conf = new Configuration(); + Server server = RPC.getServer( + TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null + ); + + server.start(); + + int numConcurrentRPC = 200; + InetSocketAddress addr = NetUtils.getConnectAddress(server); + final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC); + final CountDownLatch latch = new CountDownLatch(numConcurrentRPC); + final AtomicBoolean leaderRunning = new AtomicBoolean(true); + final AtomicReference error = new AtomicReference(); + Thread leaderThread = null; + + for (int i = 0; i < numConcurrentRPC; i++) { + final int num = i; + final TestProtocol proxy = (TestProtocol) RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + Thread rpcThread = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + while (num == 0 || leaderRunning.get()) { + proxy.slowPing(false); + } + + proxy.slowPing(false); + } catch (Exception e) { + if (num == 0) { + leaderRunning.set(false); + } else { + error.set(e); + } + + LOG.error(e); + } finally { + latch.countDown(); + } + } + }); + rpcThread.start(); + + if (leaderThread == null) { + leaderThread = rpcThread; + } + } + // let threads get past the barrier + Thread.sleep(1000); + // stop a single thread + while (leaderRunning.get()) { + leaderThread.interrupt(); + } + + latch.await(); + + // should not cause any other thread to get an error + assertTrue("rpc got exception " + error.get(), error.get() == null); + } + public static void main(String[] args) throws Exception { new TestRPC().testCallsInternal(conf);