From d7e32b0451461a1197470d4b24985cd7c61347e0 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 19 Jul 2013 22:20:09 +0000 Subject: [PATCH] HADOOP-9717. Merge r1504725 from trunk. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505053 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../io/retry/RetryInvocationHandler.java | 8 +- .../java/org/apache/hadoop/ipc/Client.java | 21 ++- .../org/apache/hadoop/ipc/RpcConstants.java | 2 + .../java/org/apache/hadoop/ipc/Server.java | 49 ++++-- .../apache/hadoop/security/SaslRpcClient.java | 2 +- .../org/apache/hadoop/util/ProtoUtil.java | 5 +- .../src/main/proto/RpcHeader.proto | 2 + .../java/org/apache/hadoop/ipc/TestIPC.java | 148 ++++++++++++++++++ .../org/apache/hadoop/util/TestProtoUtil.java | 4 +- 10 files changed, 217 insertions(+), 26 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3ce450f0a70..07e0707bd32 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -203,6 +203,8 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9716. Rpc retries should use the same call ID as the original call. (szetszwo) + HADOOP-9717. Add retry attempt count to the RPC requests. (jing9) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index e43c648a00c..58fc690a934 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -36,6 +36,8 @@ import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.util.ThreadUtil; +import com.google.common.base.Preconditions; + class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private final FailoverProxyProvider proxyProvider; @@ -87,7 +89,7 @@ class RetryInvocationHandler implements RpcInvocationHandler { } if (isRpc) { - Client.setCallId(callId); + Client.setCallIdAndRetryCount(callId, retries); } try { Object ret = invokeMethod(method, args); @@ -97,8 +99,8 @@ class RetryInvocationHandler implements RpcInvocationHandler { boolean isMethodIdempotent = proxyProvider.getInterface() .getMethod(method.getName(), method.getParameterTypes()) .isAnnotationPresent(Idempotent.class); - RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, - isMethodIdempotent); + RetryAction action = policy.shouldRetry(e, retries++, + invocationFailoverCount, isMethodIdempotent); if (action.action == RetryAction.RetryDecision.FAIL) { if (action.reason != null) { LOG.warn("Exception while invoking " + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ad44a3bd820..675df9babcb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -107,12 +107,16 @@ public class Client { private static final AtomicInteger callIdCounter = new AtomicInteger(); private static final ThreadLocal callId = new ThreadLocal(); + private static final ThreadLocal retryCount = new ThreadLocal(); - /** Set call id for the next call. */ - public static void setCallId(int cid) { + /** Set call id and retry count for the next call. */ + public static void setCallIdAndRetryCount(int cid, int rc) { Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkState(callId.get() == null); + Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); + callId.set(cid); + retryCount.set(rc); } private Hashtable connections = @@ -279,6 +283,7 @@ public class Client { */ static class Call { final int id; // call id + final int retry; // retry count final Writable rpcRequest; // the serialized rpc request Writable rpcResponse; // null if rpc has error IOException error; // exception, null if success @@ -296,6 +301,13 @@ public class Client { callId.set(null); this.id = id; } + + final Integer rc = retryCount.get(); + if (rc == null) { + this.retry = 0; + } else { + this.retry = rc; + } } /** Indicate when the call is complete and the @@ -866,7 +878,7 @@ public class Client { RpcRequestHeaderProto connectionContextHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, - clientId); + RpcConstants.INVALID_RETRY_COUNT, clientId); RpcRequestMessageWrapper request = new RpcRequestMessageWrapper(connectionContextHeader, message); @@ -974,7 +986,8 @@ public class Client { // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId); + call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, + clientId); header.writeDelimitedTo(d); call.rpcRequest.write(d); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 6fd4ac75063..a4d7327157b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -33,6 +33,8 @@ public class RpcConstants { public static final int INVALID_CALL_ID = -2; + public static final int INVALID_RETRY_COUNT = -1; + /** * The first four bytes of Hadoop RPC connections */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 061432f5cd6..fc81f05a0ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -279,6 +279,15 @@ public abstract class Server { Call call = CurCall.get(); return call != null ? call.callId : RpcConstants.INVALID_CALL_ID; } + + /** + * @return The current active RPC call's retry count. -1 indicates the retry + * cache is not supported in the client side. + */ + public static int getCallRetryCount() { + Call call = CurCall.get(); + return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT; + } /** Returns the remote side ip address when invoked inside an RPC * Returns null incase of an error. @@ -453,6 +462,7 @@ public abstract class Server { /** A call queued for handling. */ private static class Call { private final int callId; // the client's call id + private final int retryCount; // the retry count of the call private final Writable rpcRequest; // Serialized Rpc request from client private final Connection connection; // connection to client private long timestamp; // time received when response is null @@ -461,14 +471,16 @@ public abstract class Server { private final RPC.RpcKind rpcKind; private final byte[] clientId; - private Call(int id, Writable param, Connection connection) { - this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, + private Call(int id, int retryCount, Writable param, + Connection connection) { + this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID); } - private Call(int id, Writable param, Connection connection, + private Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId) { this.callId = id; + this.retryCount = retryCount; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); @@ -479,7 +491,8 @@ public abstract class Server { @Override public String toString() { - return rpcRequest + " from " + connection + " Call#" + callId; + return rpcRequest + " from " + connection + " Call#" + callId + " Retry#" + + retryCount; } public void setResponse(ByteBuffer response) { @@ -1160,11 +1173,12 @@ public abstract class Server { private static final int AUTHORIZATION_FAILED_CALLID = -1; private static final int CONNECTION_CONTEXT_CALL_ID = -3; - private final Call authFailedCall = - new Call(AUTHORIZATION_FAILED_CALLID, null, this); + private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, + RpcConstants.INVALID_RETRY_COUNT, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); - private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this); + private final Call saslCall = new Call(AuthProtocol.SASL.callId, + RpcConstants.INVALID_RETRY_COUNT, null, this); private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); private boolean sentNegotiate = false; @@ -1592,20 +1606,23 @@ public abstract class Server { if (clientVersion >= 9) { // Versions >>9 understand the normal response - Call fakeCall = new Call(-1, null, this); + Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, + this); setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, null, VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); } else if (clientVersion >= 3) { - Call fakeCall = new Call(-1, null, this); + Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, + this); // Versions 3 to 8 use older response setupResponseOldVersionFatal(buffer, fakeCall, null, VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); } else if (clientVersion == 2) { // Hadoop 0.18.3 - Call fakeCall = new Call(0, null, this); + Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, + this); DataOutputStream out = new DataOutputStream(buffer); out.writeInt(0); // call ID out.writeBoolean(true); // error @@ -1618,7 +1635,7 @@ public abstract class Server { } private void setupHttpRequestOnIpcPortResponse() throws IOException { - Call fakeCall = new Call(0, null, this); + Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this); fakeCall.setResponse(ByteBuffer.wrap( RECEIVED_HTTP_REQ_RESPONSE.getBytes())); responder.doRespond(fakeCall); @@ -1750,12 +1767,14 @@ public abstract class Server { private void processOneRpc(byte[] buf) throws IOException, WrappedRpcServerException, InterruptedException { int callId = -1; + int retry = RpcConstants.INVALID_RETRY_COUNT; try { final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); final RpcRequestHeaderProto header = decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); callId = header.getCallId(); + retry = header.getRetryCount(); if (LOG.isDebugEnabled()) { LOG.debug(" got #" + callId); } @@ -1772,7 +1791,7 @@ public abstract class Server { } } catch (WrappedRpcServerException wrse) { // inform client of error Throwable ioe = wrse.getCause(); - final Call call = new Call(callId, null, this); + final Call call = new Call(callId, retry, null, this); setupResponse(authFailedResponse, call, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, ioe.getClass().getName(), ioe.getMessage()); @@ -1846,9 +1865,9 @@ public abstract class Server { RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } - Call call = new Call(header.getCallId(), rpcRequest, this, - ProtoUtil.convert(header.getRpcKind()), header.getClientId() - .toByteArray()); + Call call = new Call(header.getCallId(), header.getRetryCount(), + rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header + .getClientId().toByteArray()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index a72bfbfeabc..aacd7927944 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -75,7 +75,7 @@ public class SaslRpcClient { private static final RpcRequestHeaderProto saslHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, - RpcConstants.DUMMY_CLIENT_ID); + RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID); private static final RpcSaslProto negotiateRequest = RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 7e32ffa27bf..79f8692842d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -160,10 +160,11 @@ public abstract class ProtoUtil { } public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, - RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { + RpcRequestHeaderProto.OperationProto operation, int callId, + int retryCount, byte[] uuid) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) - .setClientId(ByteString.copyFrom(uuid)); + .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 92f7fbe2ff1..6b013b59003 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest required uint32 callId = 3; // a sequence number that is sent back in response required bytes clientId = 4; // Globally unique client ID // clientId + callId uniquely identifies a request + // retry count, 1 means this is the first retry + optional sint32 retryCount = 5 [default = -1]; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d74d285a678..d8544e0466e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -35,6 +35,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -53,6 +55,9 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.net.ConnectTimeoutException; @@ -171,6 +176,45 @@ public class TestIPC { } } + /** + * A RpcInvocationHandler instance for test. Its invoke function uses the same + * {@link Client} instance, and will fail the first totalRetry times (by + * throwing an IOException). + */ + private static class TestInvocationHandler implements RpcInvocationHandler { + private static int retry = 0; + private final Client client; + private final Server server; + private final int total; + + TestInvocationHandler(Client client, Server server, int total) { + this.client = client; + this.server = server; + this.total = total; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + LongWritable param = new LongWritable(RANDOM.nextLong()); + LongWritable value = (LongWritable) client.call(param, + NetUtils.getConnectAddress(server), null, null, 0, conf); + if (retry++ < total) { + throw new IOException("Fake IOException"); + } else { + return value; + } + } + + @Override + public void close() throws IOException {} + + @Override + public ConnectionId getConnectionId() { + return null; + } + } + @Test public void testSerial() throws Exception { testSerial(3, false, 2, 5, 100); @@ -705,6 +749,110 @@ public class TestIPC { server.stop(); } } + + /** A dummy protocol */ + private interface DummyProtocol { + public void dummyRun(); + } + + /** + * Test the retry count while used in a retry proxy. + */ + @Test + public void testRetryProxy() throws Exception { + final Client client = new Client(LongWritable.class, conf); + + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + private int retryCount = 0; + @Override + public void run() { + Assert.assertEquals(retryCount++, Server.getCallRetryCount()); + } + }; + + final int totalRetry = 256; + DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( + DummyProtocol.class.getClassLoader(), + new Class[] { DummyProtocol.class }, new TestInvocationHandler(client, + server, totalRetry)); + DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create( + DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER); + + try { + server.start(); + retryProxy.dummyRun(); + Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1); + } finally { + Client.setCallIdAndRetryCount(0, 0); + client.stop(); + server.stop(); + } + } + + /** + * Test if the rpc server gets the default retry count (0) from client. + */ + @Test + public void testInitialCallRetryCount() throws Exception { + // Override client to store the call id + final Client client = new Client(LongWritable.class, conf); + + // Attach a listener that tracks every call ID received by the server. + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + @Override + public void run() { + // we have not set the retry count for the client, thus on the server + // side we should see retry count as 0 + Assert.assertEquals(0, Server.getCallRetryCount()); + } + }; + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + final SerialCaller caller = new SerialCaller(client, addr, 10); + caller.run(); + assertFalse(caller.failed); + } finally { + client.stop(); + server.stop(); + } + } + + /** + * Test if the rpc server gets the retry count from client. + */ + @Test + public void testCallRetryCount() throws Exception { + final int retryCount = 255; + // Override client to store the call id + final Client client = new Client(LongWritable.class, conf); + Client.setCallIdAndRetryCount(Client.nextCallId(), 255); + + // Attach a listener that tracks every call ID received by the server. + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + @Override + public void run() { + // we have not set the retry count for the client, thus on the server + // side we should see retry count as 0 + Assert.assertEquals(retryCount, Server.getCallRetryCount()); + } + }; + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + final SerialCaller caller = new SerialCaller(client, addr, 10); + caller.run(); + assertFalse(caller.failed); + } finally { + client.stop(); + server.stop(); + } + } /** * Tests that client generates a unique sequential call ID for each RPC call, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 22200d6c1fe..5d55f89d7b6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.junit.Test; @@ -79,7 +80,8 @@ public class TestProtoUtil { public void testRpcClientId() { byte[] uuid = StringUtils.getUuidBytes(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); + RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, + RpcConstants.INVALID_RETRY_COUNT, uuid); assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); } }