HADOOP-9688 merge r1500843 and r1500847, and HADOOP-9720 merge r1502301 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505005 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-19 20:41:10 +00:00
parent c2a02fb917
commit fa89218fc8
11 changed files with 136 additions and 38 deletions

View File

@ -69,6 +69,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
negotiation capabilities (daryn) negotiation capabilities (daryn)
HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh)
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn) HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
NEW FEATURES NEW FEATURES
@ -189,6 +191,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9416. Add new symlink resolution methods in FileSystem and HADOOP-9416. Add new symlink resolution methods in FileSystem and
FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe) FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe)
HADOOP-9720. Rename Client#uuid to Client#clientId. (Arpit Agarwal via
suresh)
HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol, HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol,
RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe) RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe)

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -114,6 +115,7 @@ public class Client {
private final int connectionTimeout; private final int connectionTimeout;
private final boolean fallbackAllowed; private final boolean fallbackAllowed;
private final byte[] clientId;
final static int PING_CALL_ID = -1; final static int PING_CALL_ID = -1;
@ -815,8 +817,8 @@ public class Client {
throws IOException { throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method // Write out the header, version and authentication method
out.write(Server.HEADER.array()); out.write(RpcConstants.HEADER.array());
out.write(Server.CURRENT_VERSION); out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass); out.write(serviceClass);
final AuthProtocol authProtocol; final AuthProtocol authProtocol;
switch (authMethod) { switch (authMethod) {
@ -895,7 +897,7 @@ public class Client {
if ( curTime - lastActivity.get() >= pingInterval) { if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime); lastActivity.set(curTime);
synchronized (out) { synchronized (out) {
out.writeInt(PING_CALL_ID); out.writeInt(RpcConstants.PING_CALL_ID);
out.flush(); out.flush();
} }
} }
@ -951,7 +953,7 @@ public class Client {
// Items '1' and '2' are prepared here. // Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer(); final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id); call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId);
header.writeDelimitedTo(d); header.writeDelimitedTo(d);
call.rpcRequest.write(d); call.rpcRequest.write(d);
@ -1151,6 +1153,7 @@ public class Client {
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = StringUtils.getUuidBytes();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
} }

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class RpcConstants {
private RpcConstants() {
// Hidden Constructor
}
public static final int PING_CALL_ID = -1;
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC Request header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
// 8 : SASL server always sends a final response
// 9 : Changes to protocol for HADOOP-8990
public static final byte CURRENT_VERSION = 9;
}

View File

@ -71,6 +71,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
@ -156,10 +158,6 @@ public abstract class Server {
} }
} }
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
/** /**
* If the user accidentally sends an HTTP GET to an IPC port, we detect this * If the user accidentally sends an HTTP GET to an IPC port, we detect this
@ -178,17 +176,6 @@ public abstract class Server {
"It looks like you are making an HTTP request to a Hadoop IPC port. " + "It looks like you are making an HTTP request to a Hadoop IPC port. " +
"This is not the correct port for the web interface on this daemon.\r\n"; "This is not the correct port for the web interface on this daemon.\r\n";
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC Request header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
// 8 : SASL server always sends a final response
// 9 : Changes to protocol for HADOOP-8990
public static final byte CURRENT_VERSION = 9;
/** /**
* Initial and max size of response buffer * Initial and max size of response buffer
*/ */
@ -291,6 +278,15 @@ public abstract class Server {
} }
return null; return null;
} }
/**
* Returns the clientId from the current RPC request
*/
public static byte[] getClientId() {
Call call = CurCall.get();
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
}
/** Returns remote address as a string when invoked inside an RPC. /** Returns remote address as a string when invoked inside an RPC.
* Returns null in case of an error. * Returns null in case of an error.
*/ */
@ -451,17 +447,22 @@ public abstract class Server {
// time served when response is not null // time served when response is not null
private ByteBuffer rpcResponse; // the response for this call private ByteBuffer rpcResponse; // the response for this call
private final RPC.RpcKind rpcKind; private final RPC.RpcKind rpcKind;
private final byte[] clientId;
public Call(int id, Writable param, Connection connection) { public Call(int id, Writable param, Connection connection) {
this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
} }
public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) {
public Call(int id, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this.callId = id; this.callId = id;
this.rpcRequest = param; this.rpcRequest = param;
this.connection = connection; this.connection = connection;
this.timestamp = Time.now(); this.timestamp = Time.now();
this.rpcResponse = null; this.rpcResponse = null;
this.rpcKind = kind; this.rpcKind = kind;
this.clientId = clientId;
} }
@Override @Override
@ -1440,7 +1441,8 @@ public abstract class Server {
return -1; return -1;
} }
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|| version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen. //Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " + LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort + hostAddress + ":" + remotePort +
@ -1462,7 +1464,7 @@ public abstract class Server {
if (data == null) { if (data == null) {
dataLengthBuffer.flip(); dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt(); dataLength = dataLengthBuffer.getInt();
if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
// covers the !useSasl too // covers the !useSasl too
dataLengthBuffer.clear(); dataLengthBuffer.clear();
return 0; // ping message return 0; // ping message
@ -1702,7 +1704,7 @@ public abstract class Server {
unwrappedDataLengthBuffer.flip(); unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
if (unwrappedDataLength == Client.PING_CALL_ID) { if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Received ping message"); LOG.debug("Received ping message");
unwrappedDataLengthBuffer.clear(); unwrappedDataLengthBuffer.clear();
@ -1835,7 +1837,8 @@ public abstract class Server {
} }
Call call = new Call(header.getCallId(), rpcRequest, this, Call call = new Call(header.getCallId(), rpcRequest, this,
ProtoUtil.convert(header.getRpcKind())); ProtoUtil.convert(header.getRpcKind()), header.getClientId()
.toByteArray());
callQueue.put(call); // queue the call; maybe blocked here callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count incRpcCount(); // Increment the rpc count
} }
@ -2258,7 +2261,7 @@ public abstract class Server {
RpcResponseHeaderProto.newBuilder(); RpcResponseHeaderProto.newBuilder();
headerBuilder.setCallId(call.callId); headerBuilder.setCallId(call.callId);
headerBuilder.setStatus(status); headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build(); RpcResponseHeaderProto header = headerBuilder.build();

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
@ -71,9 +72,10 @@ public class SaslRpcClient {
private final AuthMethod authMethod; private final AuthMethod authMethod;
private final SaslClient saslClient; private final SaslClient saslClient;
private final boolean fallbackAllowed; private final boolean fallbackAllowed;
private static final RpcRequestHeaderProto saslHeader = private static final RpcRequestHeaderProto saslHeader = ProtoUtil
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId); OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
RpcConstants.DUMMY_CLIENT_ID);
private static final RpcSaslProto negotiateRequest = private static final RpcSaslProto negotiateRequest =
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.ByteString;
public abstract class ProtoUtil { public abstract class ProtoUtil {
/** /**
@ -158,9 +160,10 @@ public abstract class ProtoUtil {
} }
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId) { RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setClientId(ByteString.copyFrom(uuid));
return result.build(); return result.build();
} }
} }

View File

@ -22,6 +22,7 @@ import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -32,6 +33,7 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.UUID;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -894,4 +896,15 @@ public class StringUtils {
matcher.appendTail(sb); matcher.appendTail(sb);
return sb.toString(); return sb.toString();
} }
/**
* Return a new UUID as byte[]
*/
public static byte[] getUuidBytes() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
} }

View File

@ -62,7 +62,9 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional RpcKindProto rpcKind = 1; optional RpcKindProto rpcKind = 1;
optional OperationProto rpcOp = 2; optional OperationProto rpcOp = 2;
required uint32 callId = 3; // each rpc has a callId that is also used in response required uint32 callId = 3; // a sequence number that is sent back in response
required bytes clientId = 4; // Globally unique client ID
// clientId + callId uniquely identifies a request
} }

View File

@ -736,7 +736,7 @@ public class TestIPC {
"6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n"); "6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n");
final static String HADOOP0_18_ERROR_MSG = final static String HADOOP0_18_ERROR_MSG =
"Server IPC version " + Server.CURRENT_VERSION + "Server IPC version " + RpcConstants.CURRENT_VERSION +
" cannot communicate with client version 2"; " cannot communicate with client version 2";
/** /**
@ -775,7 +775,7 @@ public class TestIPC {
"00 14 .. \n"); "00 14 .. \n");
final static String HADOOP0_20_ERROR_MSG = final static String HADOOP0_20_ERROR_MSG =
"Server IPC version " + Server.CURRENT_VERSION + "Server IPC version " + RpcConstants.CURRENT_VERSION +
" cannot communicate with client version 3"; " cannot communicate with client version 3";
@ -790,7 +790,7 @@ public class TestIPC {
final static String HADOOP0_21_ERROR_MSG = final static String HADOOP0_21_ERROR_MSG =
"Server IPC version " + Server.CURRENT_VERSION + "Server IPC version " + RpcConstants.CURRENT_VERSION +
" cannot communicate with client version 4"; " cannot communicate with client version 4";
final static byte[] HADOOP_0_21_0_RPC_DUMP = final static byte[] HADOOP_0_21_0_RPC_DUMP =

View File

@ -72,6 +72,10 @@ public class TestProtoBufRpc {
@Override @Override
public EmptyResponseProto ping(RpcController unused, public EmptyResponseProto ping(RpcController unused,
EmptyRequestProto request) throws ServiceException { EmptyRequestProto request) throws ServiceException {
// Ensure clientId is received
byte[] clientId = Server.getClientId();
Assert.assertNotNull(Server.getClientId());
Assert.assertEquals(16, clientId.length);
return EmptyResponseProto.newBuilder().build(); return EmptyResponseProto.newBuilder().build();
} }

View File

@ -17,13 +17,18 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.junit.Test; import org.junit.Test;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
@ -69,4 +74,12 @@ public class TestProtoUtil {
new ByteArrayInputStream(baos.toByteArray())); new ByteArrayInputStream(baos.toByteArray()));
assertEquals(value, ProtoUtil.readRawVarint32(dis)); assertEquals(value, ProtoUtil.readRawVarint32(dis));
} }
@Test
public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
}
} }