Revert the merge r1505005
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa89218fc8
commit
3f901c051d
@ -69,8 +69,6 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
|
HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
|
||||||
negotiation capabilities (daryn)
|
negotiation capabilities (daryn)
|
||||||
|
|
||||||
HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh)
|
|
||||||
|
|
||||||
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
|
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
@ -191,9 +189,6 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
HADOOP-9416. Add new symlink resolution methods in FileSystem and
|
HADOOP-9416. Add new symlink resolution methods in FileSystem and
|
||||||
FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe)
|
FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe)
|
||||||
|
|
||||||
HADOOP-9720. Rename Client#uuid to Client#clientId. (Arpit Agarwal via
|
|
||||||
suresh)
|
|
||||||
|
|
||||||
HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol,
|
HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol,
|
||||||
RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe)
|
RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe)
|
||||||
|
|
||||||
|
@ -85,7 +85,6 @@
|
|||||||
import org.apache.hadoop.security.token.TokenSelector;
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
import org.apache.hadoop.util.ProtoUtil;
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
@ -115,7 +114,6 @@ public class Client {
|
|||||||
private final int connectionTimeout;
|
private final int connectionTimeout;
|
||||||
|
|
||||||
private final boolean fallbackAllowed;
|
private final boolean fallbackAllowed;
|
||||||
private final byte[] clientId;
|
|
||||||
|
|
||||||
final static int PING_CALL_ID = -1;
|
final static int PING_CALL_ID = -1;
|
||||||
|
|
||||||
@ -817,8 +815,8 @@ private void writeConnectionHeader(OutputStream outStream)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
|
||||||
// Write out the header, version and authentication method
|
// Write out the header, version and authentication method
|
||||||
out.write(RpcConstants.HEADER.array());
|
out.write(Server.HEADER.array());
|
||||||
out.write(RpcConstants.CURRENT_VERSION);
|
out.write(Server.CURRENT_VERSION);
|
||||||
out.write(serviceClass);
|
out.write(serviceClass);
|
||||||
final AuthProtocol authProtocol;
|
final AuthProtocol authProtocol;
|
||||||
switch (authMethod) {
|
switch (authMethod) {
|
||||||
@ -897,7 +895,7 @@ private synchronized void sendPing() throws IOException {
|
|||||||
if ( curTime - lastActivity.get() >= pingInterval) {
|
if ( curTime - lastActivity.get() >= pingInterval) {
|
||||||
lastActivity.set(curTime);
|
lastActivity.set(curTime);
|
||||||
synchronized (out) {
|
synchronized (out) {
|
||||||
out.writeInt(RpcConstants.PING_CALL_ID);
|
out.writeInt(PING_CALL_ID);
|
||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -953,7 +951,7 @@ public void sendRpcRequest(final Call call)
|
|||||||
// Items '1' and '2' are prepared here.
|
// Items '1' and '2' are prepared here.
|
||||||
final DataOutputBuffer d = new DataOutputBuffer();
|
final DataOutputBuffer d = new DataOutputBuffer();
|
||||||
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||||
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId);
|
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
|
||||||
header.writeDelimitedTo(d);
|
header.writeDelimitedTo(d);
|
||||||
call.rpcRequest.write(d);
|
call.rpcRequest.write(d);
|
||||||
|
|
||||||
@ -1153,7 +1151,6 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
|
|||||||
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
|
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
|
||||||
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||||
this.clientId = StringUtils.getUuidBytes();
|
|
||||||
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
|
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.ipc;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class RpcConstants {
|
|
||||||
private RpcConstants() {
|
|
||||||
// Hidden Constructor
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final int PING_CALL_ID = -1;
|
|
||||||
|
|
||||||
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The first four bytes of Hadoop RPC connections
|
|
||||||
*/
|
|
||||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
|
||||||
|
|
||||||
// 1 : Introduce ping and server does not throw away RPCs
|
|
||||||
// 3 : Introduce the protocol into the RPC connection header
|
|
||||||
// 4 : Introduced SASL security layer
|
|
||||||
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
|
||||||
// in ObjectWritable to efficiently transmit arrays of primitives
|
|
||||||
// 6 : Made RPC Request header explicit
|
|
||||||
// 7 : Changed Ipc Connection Header to use Protocol buffers
|
|
||||||
// 8 : SASL server always sends a final response
|
|
||||||
// 9 : Changes to protocol for HADOOP-8990
|
|
||||||
public static final byte CURRENT_VERSION = 9;
|
|
||||||
}
|
|
@ -71,8 +71,6 @@
|
|||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
|
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
||||||
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
||||||
@ -158,6 +156,10 @@ boolean isTerse(Class<?> t) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The first four bytes of Hadoop RPC connections
|
||||||
|
*/
|
||||||
|
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
||||||
@ -176,6 +178,17 @@ boolean isTerse(Class<?> t) {
|
|||||||
"It looks like you are making an HTTP request to a Hadoop IPC port. " +
|
"It looks like you are making an HTTP request to a Hadoop IPC port. " +
|
||||||
"This is not the correct port for the web interface on this daemon.\r\n";
|
"This is not the correct port for the web interface on this daemon.\r\n";
|
||||||
|
|
||||||
|
// 1 : Introduce ping and server does not throw away RPCs
|
||||||
|
// 3 : Introduce the protocol into the RPC connection header
|
||||||
|
// 4 : Introduced SASL security layer
|
||||||
|
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
||||||
|
// in ObjectWritable to efficiently transmit arrays of primitives
|
||||||
|
// 6 : Made RPC Request header explicit
|
||||||
|
// 7 : Changed Ipc Connection Header to use Protocol buffers
|
||||||
|
// 8 : SASL server always sends a final response
|
||||||
|
// 9 : Changes to protocol for HADOOP-8990
|
||||||
|
public static final byte CURRENT_VERSION = 9;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initial and max size of response buffer
|
* Initial and max size of response buffer
|
||||||
*/
|
*/
|
||||||
@ -278,15 +291,6 @@ public static InetAddress getRemoteIp() {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the clientId from the current RPC request
|
|
||||||
*/
|
|
||||||
public static byte[] getClientId() {
|
|
||||||
Call call = CurCall.get();
|
|
||||||
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns remote address as a string when invoked inside an RPC.
|
/** Returns remote address as a string when invoked inside an RPC.
|
||||||
* Returns null in case of an error.
|
* Returns null in case of an error.
|
||||||
*/
|
*/
|
||||||
@ -447,22 +451,17 @@ private static class Call {
|
|||||||
// time served when response is not null
|
// time served when response is not null
|
||||||
private ByteBuffer rpcResponse; // the response for this call
|
private ByteBuffer rpcResponse; // the response for this call
|
||||||
private final RPC.RpcKind rpcKind;
|
private final RPC.RpcKind rpcKind;
|
||||||
private final byte[] clientId;
|
|
||||||
|
|
||||||
public Call(int id, Writable param, Connection connection) {
|
public Call(int id, Writable param, Connection connection) {
|
||||||
this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
|
this( id, param, connection, RPC.RpcKind.RPC_BUILTIN );
|
||||||
RpcConstants.DUMMY_CLIENT_ID);
|
|
||||||
}
|
}
|
||||||
|
public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) {
|
||||||
public Call(int id, Writable param, Connection connection,
|
|
||||||
RPC.RpcKind kind, byte[] clientId) {
|
|
||||||
this.callId = id;
|
this.callId = id;
|
||||||
this.rpcRequest = param;
|
this.rpcRequest = param;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.timestamp = Time.now();
|
this.timestamp = Time.now();
|
||||||
this.rpcResponse = null;
|
this.rpcResponse = null;
|
||||||
this.rpcKind = kind;
|
this.rpcKind = kind;
|
||||||
this.clientId = clientId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1441,8 +1440,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
|
||||||
|| version != CURRENT_VERSION) {
|
|
||||||
//Warning is ok since this is not supposed to happen.
|
//Warning is ok since this is not supposed to happen.
|
||||||
LOG.warn("Incorrect header or version mismatch from " +
|
LOG.warn("Incorrect header or version mismatch from " +
|
||||||
hostAddress + ":" + remotePort +
|
hostAddress + ":" + remotePort +
|
||||||
@ -1464,7 +1462,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
|||||||
if (data == null) {
|
if (data == null) {
|
||||||
dataLengthBuffer.flip();
|
dataLengthBuffer.flip();
|
||||||
dataLength = dataLengthBuffer.getInt();
|
dataLength = dataLengthBuffer.getInt();
|
||||||
if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
|
if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
|
||||||
// covers the !useSasl too
|
// covers the !useSasl too
|
||||||
dataLengthBuffer.clear();
|
dataLengthBuffer.clear();
|
||||||
return 0; // ping message
|
return 0; // ping message
|
||||||
@ -1704,7 +1702,7 @@ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
|
|||||||
unwrappedDataLengthBuffer.flip();
|
unwrappedDataLengthBuffer.flip();
|
||||||
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
|
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
|
||||||
|
|
||||||
if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
|
if (unwrappedDataLength == Client.PING_CALL_ID) {
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Received ping message");
|
LOG.debug("Received ping message");
|
||||||
unwrappedDataLengthBuffer.clear();
|
unwrappedDataLengthBuffer.clear();
|
||||||
@ -1837,8 +1835,7 @@ private void processRpcRequest(RpcRequestHeaderProto header,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Call call = new Call(header.getCallId(), rpcRequest, this,
|
Call call = new Call(header.getCallId(), rpcRequest, this,
|
||||||
ProtoUtil.convert(header.getRpcKind()), header.getClientId()
|
ProtoUtil.convert(header.getRpcKind()));
|
||||||
.toByteArray());
|
|
||||||
callQueue.put(call); // queue the call; maybe blocked here
|
callQueue.put(call); // queue the call; maybe blocked here
|
||||||
incRpcCount(); // Increment the rpc count
|
incRpcCount(); // Increment the rpc count
|
||||||
}
|
}
|
||||||
@ -2261,7 +2258,7 @@ private void setupResponse(ByteArrayOutputStream responseBuf,
|
|||||||
RpcResponseHeaderProto.newBuilder();
|
RpcResponseHeaderProto.newBuilder();
|
||||||
headerBuilder.setCallId(call.callId);
|
headerBuilder.setCallId(call.callId);
|
||||||
headerBuilder.setStatus(status);
|
headerBuilder.setStatus(status);
|
||||||
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
|
||||||
|
|
||||||
if (status == RpcStatusProto.SUCCESS) {
|
if (status == RpcStatusProto.SUCCESS) {
|
||||||
RpcResponseHeaderProto header = headerBuilder.build();
|
RpcResponseHeaderProto header = headerBuilder.build();
|
||||||
|
@ -46,7 +46,6 @@
|
|||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RpcConstants;
|
|
||||||
import org.apache.hadoop.ipc.Server.AuthProtocol;
|
import org.apache.hadoop.ipc.Server.AuthProtocol;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||||
@ -72,10 +71,9 @@ public class SaslRpcClient {
|
|||||||
private final AuthMethod authMethod;
|
private final AuthMethod authMethod;
|
||||||
private final SaslClient saslClient;
|
private final SaslClient saslClient;
|
||||||
private final boolean fallbackAllowed;
|
private final boolean fallbackAllowed;
|
||||||
private static final RpcRequestHeaderProto saslHeader = ProtoUtil
|
private static final RpcRequestHeaderProto saslHeader =
|
||||||
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
|
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
|
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId);
|
||||||
RpcConstants.DUMMY_CLIENT_ID);
|
|
||||||
private static final RpcSaslProto negotiateRequest =
|
private static final RpcSaslProto negotiateRequest =
|
||||||
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
|
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
|
||||||
|
|
||||||
|
@ -28,8 +28,6 @@
|
|||||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
|
|
||||||
public abstract class ProtoUtil {
|
public abstract class ProtoUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -160,10 +158,9 @@ public static RPC.RpcKind convert( RpcKindProto kind) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||||
RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
|
RpcRequestHeaderProto.OperationProto operation, int callId) {
|
||||||
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
|
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
|
||||||
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
|
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
|
||||||
.setClientId(ByteString.copyFrom(uuid));
|
|
||||||
return result.build();
|
return result.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -33,7 +32,6 @@
|
|||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
@ -896,15 +894,4 @@ public static String replaceTokens(String template, Pattern pattern,
|
|||||||
matcher.appendTail(sb);
|
matcher.appendTail(sb);
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a new UUID as byte[]
|
|
||||||
*/
|
|
||||||
public static byte[] getUuidBytes() {
|
|
||||||
UUID uuid = UUID.randomUUID();
|
|
||||||
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
|
|
||||||
buf.putLong(uuid.getMostSignificantBits());
|
|
||||||
buf.putLong(uuid.getLeastSignificantBits());
|
|
||||||
return buf.array();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -62,9 +62,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
|||||||
|
|
||||||
optional RpcKindProto rpcKind = 1;
|
optional RpcKindProto rpcKind = 1;
|
||||||
optional OperationProto rpcOp = 2;
|
optional OperationProto rpcOp = 2;
|
||||||
required uint32 callId = 3; // a sequence number that is sent back in response
|
required uint32 callId = 3; // each rpc has a callId that is also used in response
|
||||||
required bytes clientId = 4; // Globally unique client ID
|
|
||||||
// clientId + callId uniquely identifies a request
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -736,7 +736,7 @@ private static abstract class NetworkTraces {
|
|||||||
"6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n");
|
"6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n");
|
||||||
|
|
||||||
final static String HADOOP0_18_ERROR_MSG =
|
final static String HADOOP0_18_ERROR_MSG =
|
||||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
"Server IPC version " + Server.CURRENT_VERSION +
|
||||||
" cannot communicate with client version 2";
|
" cannot communicate with client version 2";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -775,7 +775,7 @@ private static abstract class NetworkTraces {
|
|||||||
"00 14 .. \n");
|
"00 14 .. \n");
|
||||||
|
|
||||||
final static String HADOOP0_20_ERROR_MSG =
|
final static String HADOOP0_20_ERROR_MSG =
|
||||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
"Server IPC version " + Server.CURRENT_VERSION +
|
||||||
" cannot communicate with client version 3";
|
" cannot communicate with client version 3";
|
||||||
|
|
||||||
|
|
||||||
@ -790,7 +790,7 @@ private static abstract class NetworkTraces {
|
|||||||
|
|
||||||
|
|
||||||
final static String HADOOP0_21_ERROR_MSG =
|
final static String HADOOP0_21_ERROR_MSG =
|
||||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
"Server IPC version " + Server.CURRENT_VERSION +
|
||||||
" cannot communicate with client version 4";
|
" cannot communicate with client version 4";
|
||||||
|
|
||||||
final static byte[] HADOOP_0_21_0_RPC_DUMP =
|
final static byte[] HADOOP_0_21_0_RPC_DUMP =
|
||||||
|
@ -72,10 +72,6 @@ public static class PBServerImpl implements TestRpcService {
|
|||||||
@Override
|
@Override
|
||||||
public EmptyResponseProto ping(RpcController unused,
|
public EmptyResponseProto ping(RpcController unused,
|
||||||
EmptyRequestProto request) throws ServiceException {
|
EmptyRequestProto request) throws ServiceException {
|
||||||
// Ensure clientId is received
|
|
||||||
byte[] clientId = Server.getClientId();
|
|
||||||
Assert.assertNotNull(Server.getClientId());
|
|
||||||
Assert.assertEquals(16, clientId.length);
|
|
||||||
return EmptyResponseProto.newBuilder().build();
|
return EmptyResponseProto.newBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,18 +17,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.protobuf.CodedOutputStream;
|
import com.google.protobuf.CodedOutputStream;
|
||||||
@ -74,12 +69,4 @@ private void doVarIntTest(int value) throws IOException {
|
|||||||
new ByteArrayInputStream(baos.toByteArray()));
|
new ByteArrayInputStream(baos.toByteArray()));
|
||||||
assertEquals(value, ProtoUtil.readRawVarint32(dis));
|
assertEquals(value, ProtoUtil.readRawVarint32(dis));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcClientId() {
|
|
||||||
byte[] uuid = StringUtils.getUuidBytes();
|
|
||||||
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
|
||||||
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
|
|
||||||
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user