HADOOP-9717. Merge r1504725 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505053 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-19 22:20:09 +00:00
parent 73832071ea
commit d7e32b0451
10 changed files with 217 additions and 26 deletions

View File

@ -203,6 +203,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9716. Rpc retries should use the same call ID as the original call. HADOOP-9716. Rpc retries should use the same call ID as the original call.
(szetszwo) (szetszwo)
HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
import com.google.common.base.Preconditions;
class RetryInvocationHandler implements RpcInvocationHandler { class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider proxyProvider; private final FailoverProxyProvider proxyProvider;
@ -87,7 +89,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
} }
if (isRpc) { if (isRpc) {
Client.setCallId(callId); Client.setCallIdAndRetryCount(callId, retries);
} }
try { try {
Object ret = invokeMethod(method, args); Object ret = invokeMethod(method, args);
@ -97,8 +99,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
boolean isMethodIdempotent = proxyProvider.getInterface() boolean isMethodIdempotent = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes()) .getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class); .isAnnotationPresent(Idempotent.class);
RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, RetryAction action = policy.shouldRetry(e, retries++,
isMethodIdempotent); invocationFailoverCount, isMethodIdempotent);
if (action.action == RetryAction.RetryDecision.FAIL) { if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) { if (action.reason != null) {
LOG.warn("Exception while invoking " + LOG.warn("Exception while invoking " +

View File

@ -107,12 +107,16 @@ public class Client {
private static final AtomicInteger callIdCounter = new AtomicInteger(); private static final AtomicInteger callIdCounter = new AtomicInteger();
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
/** Set call id for the next call. */ /** Set call id and retry count for the next call. */
public static void setCallId(int cid) { public static void setCallIdAndRetryCount(int cid, int rc) {
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null); Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
callId.set(cid); callId.set(cid);
retryCount.set(rc);
} }
private Hashtable<ConnectionId, Connection> connections = private Hashtable<ConnectionId, Connection> connections =
@ -279,6 +283,7 @@ Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
*/ */
static class Call { static class Call {
final int id; // call id final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success IOException error; // exception, null if success
@ -296,6 +301,13 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
callId.set(null); callId.set(null);
this.id = id; this.id = id;
} }
final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
} }
/** Indicate when the call is complete and the /** Indicate when the call is complete and the
@ -866,7 +878,7 @@ private void writeConnectionContext(ConnectionId remoteId,
RpcRequestHeaderProto connectionContextHeader = ProtoUtil RpcRequestHeaderProto connectionContextHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
clientId); RpcConstants.INVALID_RETRY_COUNT, clientId);
RpcRequestMessageWrapper request = RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message); new RpcRequestMessageWrapper(connectionContextHeader, message);
@ -974,7 +986,8 @@ public void sendRpcRequest(final Call call)
// Items '1' and '2' are prepared here. // Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer(); final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId); call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d); header.writeDelimitedTo(d);
call.rpcRequest.write(d); call.rpcRequest.write(d);

View File

@ -33,6 +33,8 @@ private RpcConstants() {
public static final int INVALID_CALL_ID = -2; public static final int INVALID_CALL_ID = -2;
public static final int INVALID_RETRY_COUNT = -1;
/** /**
* The first four bytes of Hadoop RPC connections * The first four bytes of Hadoop RPC connections
*/ */

View File

@ -279,6 +279,15 @@ static int getCallId() {
Call call = CurCall.get(); Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID; return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
} }
/**
* @return The current active RPC call's retry count. -1 indicates the retry
* cache is not supported in the client side.
*/
public static int getCallRetryCount() {
Call call = CurCall.get();
return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
}
/** Returns the remote side ip address when invoked inside an RPC /** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error. * Returns null incase of an error.
@ -453,6 +462,7 @@ public ServiceAuthorizationManager getServiceAuthorizationManager() {
/** A call queued for handling. */ /** A call queued for handling. */
private static class Call { private static class Call {
private final int callId; // the client's call id private final int callId; // the client's call id
private final int retryCount; // the retry count of the call
private final Writable rpcRequest; // Serialized Rpc request from client private final Writable rpcRequest; // Serialized Rpc request from client
private final Connection connection; // connection to client private final Connection connection; // connection to client
private long timestamp; // time received when response is null private long timestamp; // time received when response is null
@ -461,14 +471,16 @@ private static class Call {
private final RPC.RpcKind rpcKind; private final RPC.RpcKind rpcKind;
private final byte[] clientId; private final byte[] clientId;
private Call(int id, Writable param, Connection connection) { private Call(int id, int retryCount, Writable param,
this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, Connection connection) {
this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID); RpcConstants.DUMMY_CLIENT_ID);
} }
private Call(int id, Writable param, Connection connection, private Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) { RPC.RpcKind kind, byte[] clientId) {
this.callId = id; this.callId = id;
this.retryCount = retryCount;
this.rpcRequest = param; this.rpcRequest = param;
this.connection = connection; this.connection = connection;
this.timestamp = Time.now(); this.timestamp = Time.now();
@ -479,7 +491,8 @@ private Call(int id, Writable param, Connection connection,
@Override @Override
public String toString() { public String toString() {
return rpcRequest + " from " + connection + " Call#" + callId; return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ retryCount;
} }
public void setResponse(ByteBuffer response) { public void setResponse(ByteBuffer response) {
@ -1160,11 +1173,12 @@ public class Connection {
private static final int AUTHORIZATION_FAILED_CALLID = -1; private static final int AUTHORIZATION_FAILED_CALLID = -1;
private static final int CONNECTION_CONTEXT_CALL_ID = -3; private static final int CONNECTION_CONTEXT_CALL_ID = -3;
private final Call authFailedCall = private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
new Call(AUTHORIZATION_FAILED_CALLID, null, this); RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this); private final Call saslCall = new Call(AuthProtocol.SASL.callId,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean sentNegotiate = false; private boolean sentNegotiate = false;
@ -1592,20 +1606,23 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
if (clientVersion >= 9) { if (clientVersion >= 9) {
// Versions >>9 understand the normal response // Versions >>9 understand the normal response
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
setupResponse(buffer, fakeCall, setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} else if (clientVersion >= 3) { } else if (clientVersion >= 3) {
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
// Versions 3 to 8 use older response // Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall, setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} else if (clientVersion == 2) { // Hadoop 0.18.3 } else if (clientVersion == 2) { // Hadoop 0.18.3
Call fakeCall = new Call(0, null, this); Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
this);
DataOutputStream out = new DataOutputStream(buffer); DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID out.writeInt(0); // call ID
out.writeBoolean(true); // error out.writeBoolean(true); // error
@ -1618,7 +1635,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
} }
private void setupHttpRequestOnIpcPortResponse() throws IOException { private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, null, this); Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap( fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes())); RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
@ -1750,12 +1767,14 @@ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
private void processOneRpc(byte[] buf) private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException { throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1; int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try { try {
final DataInputStream dis = final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf)); new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header = final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId(); callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId); LOG.debug(" got #" + callId);
} }
@ -1772,7 +1791,7 @@ private void processOneRpc(byte[] buf)
} }
} catch (WrappedRpcServerException wrse) { // inform client of error } catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause(); Throwable ioe = wrse.getCause();
final Call call = new Call(callId, null, this); final Call call = new Call(callId, retry, null, this);
setupResponse(authFailedResponse, call, setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage()); ioe.getClass().getName(), ioe.getMessage());
@ -1846,9 +1865,9 @@ private void processRpcRequest(RpcRequestHeaderProto header,
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
} }
Call call = new Call(header.getCallId(), rpcRequest, this, Call call = new Call(header.getCallId(), header.getRetryCount(),
ProtoUtil.convert(header.getRpcKind()), header.getClientId() rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
.toByteArray()); .getClientId().toByteArray());
callQueue.put(call); // queue the call; maybe blocked here callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count incRpcCount(); // Increment the rpc count
} }

View File

@ -75,7 +75,7 @@ public class SaslRpcClient {
private static final RpcRequestHeaderProto saslHeader = ProtoUtil private static final RpcRequestHeaderProto saslHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
RpcConstants.DUMMY_CLIENT_ID); RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
private static final RpcSaslProto negotiateRequest = private static final RpcSaslProto negotiateRequest =
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();

View File

@ -160,10 +160,11 @@ public static RPC.RpcKind convert( RpcKindProto kind) {
} }
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { RpcRequestHeaderProto.OperationProto operation, int callId,
int retryCount, byte[] uuid) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setClientId(ByteString.copyFrom(uuid)); .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
return result.build(); return result.build();
} }
} }

View File

@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
required uint32 callId = 3; // a sequence number that is sent back in response required uint32 callId = 3; // a sequence number that is sent back in response
required bytes clientId = 4; // Globally unique client ID required bytes clientId = 4; // Globally unique client ID
// clientId + callId uniquely identifies a request // clientId + callId uniquely identifies a request
// retry count, 1 means this is the first retry
optional sint32 retryCount = 5 [default = -1];
} }

View File

@ -35,6 +35,8 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -53,6 +55,9 @@
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
@ -171,6 +176,45 @@ public void run() {
} }
} }
/**
* A RpcInvocationHandler instance for test. Its invoke function uses the same
* {@link Client} instance, and will fail the first totalRetry times (by
* throwing an IOException).
*/
private static class TestInvocationHandler implements RpcInvocationHandler {
private static int retry = 0;
private final Client client;
private final Server server;
private final int total;
TestInvocationHandler(Client client, Server server, int total) {
this.client = client;
this.server = server;
this.total = total;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value = (LongWritable) client.call(param,
NetUtils.getConnectAddress(server), null, null, 0, conf);
if (retry++ < total) {
throw new IOException("Fake IOException");
} else {
return value;
}
}
@Override
public void close() throws IOException {}
@Override
public ConnectionId getConnectionId() {
return null;
}
}
@Test @Test
public void testSerial() throws Exception { public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100); testSerial(3, false, 2, 5, 100);
@ -705,6 +749,110 @@ public void run() {
server.stop(); server.stop();
} }
} }
/** A dummy protocol */
private interface DummyProtocol {
public void dummyRun();
}
/**
* Test the retry count while used in a retry proxy.
*/
@Test
public void testRetryProxy() throws Exception {
final Client client = new Client(LongWritable.class, conf);
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
private int retryCount = 0;
@Override
public void run() {
Assert.assertEquals(retryCount++, Server.getCallRetryCount());
}
};
final int totalRetry = 256;
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
DummyProtocol.class.getClassLoader(),
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
server, totalRetry));
DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(
DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER);
try {
server.start();
retryProxy.dummyRun();
Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
} finally {
Client.setCallIdAndRetryCount(0, 0);
client.stop();
server.stop();
}
}
/**
* Test if the rpc server gets the default retry count (0) from client.
*/
@Test
public void testInitialCallRetryCount() throws Exception {
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
@Override
public void run() {
// we have not set the retry count for the client, thus on the server
// side we should see retry count as 0
Assert.assertEquals(0, Server.getCallRetryCount());
}
};
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
assertFalse(caller.failed);
} finally {
client.stop();
server.stop();
}
}
/**
* Test if the rpc server gets the retry count from client.
*/
@Test
public void testCallRetryCount() throws Exception {
final int retryCount = 255;
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
@Override
public void run() {
// we have not set the retry count for the client, thus on the server
// side we should see retry count as 0
Assert.assertEquals(retryCount, Server.getCallRetryCount());
}
};
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
assertFalse(caller.failed);
} finally {
client.stop();
server.stop();
}
}
/** /**
* Tests that client generates a unique sequential call ID for each RPC call, * Tests that client generates a unique sequential call ID for each RPC call,

View File

@ -27,6 +27,7 @@
import java.util.Arrays; import java.util.Arrays;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.junit.Test; import org.junit.Test;
@ -79,7 +80,8 @@ private void doVarIntTest(int value) throws IOException {
public void testRpcClientId() { public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes(); byte[] uuid = StringUtils.getUuidBytes();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
RpcConstants.INVALID_RETRY_COUNT, uuid);
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
} }
} }