HADOOP-9751. Merge r1505036 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-19 22:40:26 +00:00
parent d7e32b0451
commit 0d85f58035
5 changed files with 50 additions and 12 deletions

View File

@ -205,6 +205,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9717. Add retry attempt count to the RPC requests. (jing9) HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto.
(szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

View File

@ -33,6 +33,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -274,6 +275,24 @@ public class Client {
return refCount==0; return refCount==0;
} }
/** Check the rpc response header. */
void checkResponse(RpcResponseHeaderProto header) throws IOException {
if (header == null) {
throw new IOException("Response is null.");
}
if (header.hasClientId()) {
// check client IDs
final byte[] id = header.getClientId().toByteArray();
if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {
if (!Arrays.equals(id, clientId)) {
throw new IOException("Client IDs not matched: local ID="
+ StringUtils.byteToHexString(clientId) + ", ID in reponse="
+ StringUtils.byteToHexString(header.getClientId().toByteArray()));
}
}
}
}
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
return new Call(rpcKind, rpcRequest); return new Call(rpcKind, rpcRequest);
} }
@ -1052,9 +1071,8 @@ public class Client {
int totalLen = in.readInt(); int totalLen = in.readInt();
RpcResponseHeaderProto header = RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in); RpcResponseHeaderProto.parseDelimitedFrom(in);
if (header == null) { checkResponse(header);
throw new IOException("Response is null.");
}
int headerLen = header.getSerializedSize(); int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

View File

@ -2287,7 +2287,9 @@ public abstract class Server {
DataOutputStream out = new DataOutputStream(responseBuf); DataOutputStream out = new DataOutputStream(responseBuf);
RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.Builder headerBuilder =
RpcResponseHeaderProto.newBuilder(); RpcResponseHeaderProto.newBuilder();
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
headerBuilder.setCallId(call.callId); headerBuilder.setCallId(call.callId);
headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status); headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);

View File

@ -130,6 +130,8 @@ message RpcResponseHeaderProto {
optional string exceptionClassName = 4; // if request fails optional string exceptionClassName = 4; // if request fails
optional string errorMsg = 5; // if request fails, often contains strack trace optional string errorMsg = 5; // if request fails, often contains strack trace
optional RpcErrorCodeProto errorDetail = 6; // in case of error optional RpcErrorCodeProto errorDetail = 6; // in case of error
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
} }
message RpcSaslProto { message RpcSaslProto {
@ -153,4 +155,4 @@ message RpcSaslProto {
required SaslState state = 2; required SaslState state = 2;
optional bytes token = 3; optional bytes token = 3;
repeated SaslAuth auths = 4; repeated SaslAuth auths = 4;
} }

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -708,33 +709,45 @@ public class TestIPC {
assertRetriesOnSocketTimeouts(conf, 4); assertRetriesOnSocketTimeouts(conf, 4);
} }
private static class CallId { private static class CallInfo {
int id = RpcConstants.INVALID_CALL_ID; int id = RpcConstants.INVALID_CALL_ID;
int retry = RpcConstants.INVALID_RETRY_COUNT;
} }
/** /**
* Test if the rpc server uses the call id generated by the rpc client. * Test if
* (1) the rpc server uses the call id/retry provided by the rpc client, and
* (2) the rpc client receives the same call id/retry from the rpc server.
*/ */
@Test @Test
public void testCallIds() throws Exception { public void testCallIdAndRetry() throws Exception {
final CallId callId = new CallId(); final CallInfo info = new CallInfo();
// Override client to store the call id // Override client to store the call info and check response
final Client client = new Client(LongWritable.class, conf) { final Client client = new Client(LongWritable.class, conf) {
@Override @Override
Call createCall(RpcKind rpcKind, Writable rpcRequest) { Call createCall(RpcKind rpcKind, Writable rpcRequest) {
final Call call = super.createCall(rpcKind, rpcRequest); final Call call = super.createCall(rpcKind, rpcRequest);
callId.id = call.id; info.id = call.id;
info.retry = call.retry;
return call; return call;
} }
@Override
void checkResponse(RpcResponseHeaderProto header) throws IOException {
super.checkResponse(header);
Assert.assertEquals(info.id, header.getCallId());
Assert.assertEquals(info.retry, header.getRetryCount());
}
}; };
// Attach a listener that tracks every call ID received by the server. // Attach a listener that tracks every call received by the server.
final TestServer server = new TestServer(1, false); final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() { server.callListener = new Runnable() {
@Override @Override
public void run() { public void run() {
Assert.assertEquals(callId.id, Server.getCallId()); Assert.assertEquals(info.id, Server.getCallId());
Assert.assertEquals(info.retry, Server.getCallRetryCount());
} }
}; };