HADOOP-6762. Exception while doing RPC I/O closes channel. Contributed by Sam Rash and Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419782 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb9f516756
commit
7ba12a628a
|
@ -473,6 +473,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm)
|
||||
|
||||
HADOOP-6762. Exception while doing RPC I/O closes channel
|
||||
(Sam Rash and todd via todd)
|
||||
|
||||
Release 2.0.2-alpha - 2012-09-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -38,6 +38,11 @@ import java.util.Iterator;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -78,6 +83,8 @@ import org.apache.hadoop.util.ProtoUtil;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
||||
* parameter, and return a {@link Writable} as their value. A service runs on
|
||||
* a port and is defined by a parameter class and a value class.
|
||||
|
@ -103,6 +110,19 @@ public class Client {
|
|||
|
||||
final static int PING_CALL_ID = -1;
|
||||
|
||||
/**
|
||||
* Executor on which IPC calls' parameters are sent. Deferring
|
||||
* the sending of parameters to a separate thread isolates them
|
||||
* from thread interruptions in the calling code.
|
||||
*/
|
||||
private static final ExecutorService SEND_PARAMS_EXECUTOR =
|
||||
Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("IPC Parameter Sending Thread #%d")
|
||||
.build());
|
||||
|
||||
|
||||
/**
|
||||
* set the ping interval value in configuration
|
||||
*
|
||||
|
@ -246,6 +266,8 @@ public class Client {
|
|||
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
|
||||
private IOException closeException; // close reason
|
||||
|
||||
private final Object sendParamsLock = new Object();
|
||||
|
||||
public Connection(ConnectionId remoteId) throws IOException {
|
||||
this.remoteId = remoteId;
|
||||
this.server = remoteId.getAddress();
|
||||
|
@ -831,36 +853,52 @@ public class Client {
|
|||
* Note: this is not called from the Connection thread, but by other
|
||||
* threads.
|
||||
*/
|
||||
public void sendParam(Call call) {
|
||||
public void sendParam(final Call call)
|
||||
throws InterruptedException, IOException {
|
||||
if (shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
DataOutputBuffer d=null;
|
||||
try {
|
||||
synchronized (this.out) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(getName() + " sending #" + call.id);
|
||||
|
||||
// Serializing the data to be written.
|
||||
// Format:
|
||||
// Serialize the call to be sent. This is done from the actual
|
||||
// caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
|
||||
// so that if the serialization throws an error, it is reported
|
||||
// properly. This also parallelizes the serialization.
|
||||
//
|
||||
// Format of a call on the wire:
|
||||
// 0) Length of rest below (1 + 2)
|
||||
// 1) PayloadHeader - is serialized Delimited hence contains length
|
||||
// 2) the Payload - the RpcRequest
|
||||
//
|
||||
d = new DataOutputBuffer();
|
||||
// Items '1' and '2' are prepared here.
|
||||
final DataOutputBuffer d = new DataOutputBuffer();
|
||||
RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
|
||||
call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
|
||||
header.writeDelimitedTo(d);
|
||||
call.rpcRequest.write(d);
|
||||
byte[] data = d.getData();
|
||||
|
||||
synchronized (sendParamsLock) {
|
||||
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
synchronized (Connection.this.out) {
|
||||
if (shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(getName() + " sending #" + call.id);
|
||||
|
||||
byte[] data = d.getData();
|
||||
int totalLength = d.getLength();
|
||||
out.writeInt(totalLength); // Total Length
|
||||
out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
|
||||
out.flush();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// exception at this point would leave the connection in an
|
||||
// unrecoverable state (eg half a call left on the wire).
|
||||
// So, close the connection, killing any outstanding calls
|
||||
markClosed(e);
|
||||
} finally {
|
||||
//the buffer is just an in-memory buffer, but it is still polite to
|
||||
|
@ -868,6 +906,23 @@ public class Client {
|
|||
IOUtils.closeStream(d);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
senderFuture.get();
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
|
||||
// cause should only be a RuntimeException as the Runnable above
|
||||
// catches IOException
|
||||
if (cause instanceof RuntimeException) {
|
||||
throw (RuntimeException) cause;
|
||||
} else {
|
||||
throw new RuntimeException("unexpected checked exception", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Receive a response.
|
||||
* Because only one receiver, so no synchronization on in.
|
||||
|
@ -1138,7 +1193,16 @@ public class Client {
|
|||
ConnectionId remoteId) throws InterruptedException, IOException {
|
||||
Call call = new Call(rpcKind, rpcRequest);
|
||||
Connection connection = getConnection(remoteId, call);
|
||||
try {
|
||||
connection.sendParam(call); // send the parameter
|
||||
} catch (RejectedExecutionException e) {
|
||||
throw new IOException("connection has been closed", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.warn("interrupted waiting to send params to server", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
boolean interrupted = false;
|
||||
synchronized (call) {
|
||||
while (!call.done) {
|
||||
|
|
|
@ -68,6 +68,7 @@ public class TestIPC {
|
|||
* of the various writables.
|
||||
**/
|
||||
static boolean WRITABLE_FAULTS_ENABLED = true;
|
||||
static int WRITABLE_FAULTS_SLEEP = 0;
|
||||
|
||||
static {
|
||||
Client.setPingInterval(conf, PING_INTERVAL);
|
||||
|
@ -206,16 +207,27 @@ public class TestIPC {
|
|||
|
||||
static void maybeThrowIOE() throws IOException {
|
||||
if (WRITABLE_FAULTS_ENABLED) {
|
||||
maybeSleep();
|
||||
throw new IOException("Injected fault");
|
||||
}
|
||||
}
|
||||
|
||||
static void maybeThrowRTE() {
|
||||
if (WRITABLE_FAULTS_ENABLED) {
|
||||
maybeSleep();
|
||||
throw new RuntimeException("Injected fault");
|
||||
}
|
||||
}
|
||||
|
||||
private static void maybeSleep() {
|
||||
if (WRITABLE_FAULTS_SLEEP > 0) {
|
||||
try {
|
||||
Thread.sleep(WRITABLE_FAULTS_SLEEP);
|
||||
} catch (InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private static class IOEOnReadWritable extends LongWritable {
|
||||
public IOEOnReadWritable() {}
|
||||
|
@ -370,6 +382,27 @@ public class TestIPC {
|
|||
RTEOnReadWritable.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case that fails a write, but only after taking enough time
|
||||
* that a ping should have been sent. This is a reproducer for a
|
||||
* deadlock seen in one iteration of HADOOP-6762.
|
||||
*/
|
||||
@Test
|
||||
public void testIOEOnWriteAfterPingClient() throws Exception {
|
||||
// start server
|
||||
Client.setPingInterval(conf, 100);
|
||||
|
||||
try {
|
||||
WRITABLE_FAULTS_SLEEP = 1000;
|
||||
doErrorTest(IOEOnWriteWritable.class,
|
||||
LongWritable.class,
|
||||
LongWritable.class,
|
||||
LongWritable.class);
|
||||
} finally {
|
||||
WRITABLE_FAULTS_SLEEP = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertExceptionContains(
|
||||
Throwable t, String substring) {
|
||||
String msg = StringUtils.stringifyException(t);
|
||||
|
|
|
@ -38,6 +38,10 @@ import java.net.ConnectException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
|
@ -823,6 +827,96 @@ public class TestRPC {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=90000)
|
||||
public void testRPCInterruptedSimple() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
Server server = RPC.getServer(
|
||||
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
||||
);
|
||||
server.start();
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
|
||||
final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
||||
TestProtocol.class, TestProtocol.versionID, addr, conf);
|
||||
// Connect to the server
|
||||
proxy.ping();
|
||||
// Interrupt self, try another call
|
||||
Thread.currentThread().interrupt();
|
||||
try {
|
||||
proxy.ping();
|
||||
fail("Interruption did not cause IPC to fail");
|
||||
} catch (IOException ioe) {
|
||||
if (!ioe.toString().contains("InterruptedException")) {
|
||||
throw ioe;
|
||||
}
|
||||
// clear interrupt status for future tests
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testRPCInterrupted() throws IOException, InterruptedException {
|
||||
final Configuration conf = new Configuration();
|
||||
Server server = RPC.getServer(
|
||||
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
||||
);
|
||||
|
||||
server.start();
|
||||
|
||||
int numConcurrentRPC = 200;
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
|
||||
final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
|
||||
final AtomicBoolean leaderRunning = new AtomicBoolean(true);
|
||||
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
Thread leaderThread = null;
|
||||
|
||||
for (int i = 0; i < numConcurrentRPC; i++) {
|
||||
final int num = i;
|
||||
final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
||||
TestProtocol.class, TestProtocol.versionID, addr, conf);
|
||||
Thread rpcThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
barrier.await();
|
||||
while (num == 0 || leaderRunning.get()) {
|
||||
proxy.slowPing(false);
|
||||
}
|
||||
|
||||
proxy.slowPing(false);
|
||||
} catch (Exception e) {
|
||||
if (num == 0) {
|
||||
leaderRunning.set(false);
|
||||
} else {
|
||||
error.set(e);
|
||||
}
|
||||
|
||||
LOG.error(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
rpcThread.start();
|
||||
|
||||
if (leaderThread == null) {
|
||||
leaderThread = rpcThread;
|
||||
}
|
||||
}
|
||||
// let threads get past the barrier
|
||||
Thread.sleep(1000);
|
||||
// stop a single thread
|
||||
while (leaderRunning.get()) {
|
||||
leaderThread.interrupt();
|
||||
}
|
||||
|
||||
latch.await();
|
||||
|
||||
// should not cause any other thread to get an error
|
||||
assertTrue("rpc got exception " + error.get(), error.get() == null);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
||||
|
|
Loading…
Reference in New Issue