From 0d85f580351b4fe3ae367d006d47979092d76f51 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 19 Jul 2013 22:40:26 +0000 Subject: [PATCH] HADOOP-9751. Merge r1505036 from trunk. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505059 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 ++ .../java/org/apache/hadoop/ipc/Client.java | 24 +++++++++++++-- .../java/org/apache/hadoop/ipc/Server.java | 2 ++ .../src/main/proto/RpcHeader.proto | 4 ++- .../java/org/apache/hadoop/ipc/TestIPC.java | 29 ++++++++++++++----- 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 07e0707bd32..9a480d27de8 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -205,6 +205,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9717. Add retry attempt count to the RPC requests. (jing9) + HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto. + (szetszwo) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 675df9babcb..c1ee1173d86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -33,6 +33,7 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Hashtable; import java.util.Iterator; import java.util.Map.Entry; @@ -274,6 +275,24 @@ public class Client { return refCount==0; } + /** Check the rpc response header. */ + void checkResponse(RpcResponseHeaderProto header) throws IOException { + if (header == null) { + throw new IOException("Response is null."); + } + if (header.hasClientId()) { + // check client IDs + final byte[] id = header.getClientId().toByteArray(); + if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) { + if (!Arrays.equals(id, clientId)) { + throw new IOException("Client IDs not matched: local ID=" + + StringUtils.byteToHexString(clientId) + ", ID in reponse=" + + StringUtils.byteToHexString(header.getClientId().toByteArray())); + } + } + } + } + Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { return new Call(rpcKind, rpcRequest); } @@ -1052,9 +1071,8 @@ public class Client { int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); - if (header == null) { - throw new IOException("Response is null."); - } + checkResponse(header); + int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index fc81f05a0ac..8799011efa9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2287,7 +2287,9 @@ public abstract class Server { DataOutputStream out = new DataOutputStream(responseBuf); RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.newBuilder(); + headerBuilder.setClientId(ByteString.copyFrom(call.clientId)); headerBuilder.setCallId(call.callId); + headerBuilder.setRetryCount(call.retryCount); headerBuilder.setStatus(status); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 6b013b59003..2f61d986829 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -130,6 +130,8 @@ message RpcResponseHeaderProto { optional string exceptionClassName = 4; // if request fails optional string errorMsg = 5; // if request fails, often contains strack trace optional RpcErrorCodeProto errorDetail = 6; // in case of error + optional bytes clientId = 7; // Globally unique client ID + optional sint32 retryCount = 8 [default = -1]; } message RpcSaslProto { @@ -153,4 +155,4 @@ message RpcSaslProto { required SaslState state = 2; optional bytes token = 3; repeated SaslAuth auths = 4; -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d8544e0466e..2b33f5210ff 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -60,6 +60,7 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -708,33 +709,45 @@ public class TestIPC { assertRetriesOnSocketTimeouts(conf, 4); } - private static class CallId { + private static class CallInfo { int id = RpcConstants.INVALID_CALL_ID; + int retry = RpcConstants.INVALID_RETRY_COUNT; } /** - * Test if the rpc server uses the call id generated by the rpc client. + * Test if + * (1) the rpc server uses the call id/retry provided by the rpc client, and + * (2) the rpc client receives the same call id/retry from the rpc server. */ @Test - public void testCallIds() throws Exception { - final CallId callId = new CallId(); + public void testCallIdAndRetry() throws Exception { + final CallInfo info = new CallInfo(); - // Override client to store the call id + // Override client to store the call info and check response final Client client = new Client(LongWritable.class, conf) { @Override Call createCall(RpcKind rpcKind, Writable rpcRequest) { final Call call = super.createCall(rpcKind, rpcRequest); - callId.id = call.id; + info.id = call.id; + info.retry = call.retry; return call; } + + @Override + void checkResponse(RpcResponseHeaderProto header) throws IOException { + super.checkResponse(header); + Assert.assertEquals(info.id, header.getCallId()); + Assert.assertEquals(info.retry, header.getRetryCount()); + } }; - // Attach a listener that tracks every call ID received by the server. + // Attach a listener that tracks every call received by the server. final TestServer server = new TestServer(1, false); server.callListener = new Runnable() { @Override public void run() { - Assert.assertEquals(callId.id, Server.getCallId()); + Assert.assertEquals(info.id, Server.getCallId()); + Assert.assertEquals(info.retry, Server.getCallRetryCount()); } };