From fa89218fc89c036ed2f134266dfe1b164fe2ace0 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 19 Jul 2013 20:41:10 +0000 Subject: [PATCH] HADOOP-9688 merge r1500843 and r1500847, and HADOOP-9720 merge r1502301 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505005 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 5 ++ .../java/org/apache/hadoop/ipc/Client.java | 11 ++-- .../org/apache/hadoop/ipc/RpcConstants.java | 50 +++++++++++++++++ .../java/org/apache/hadoop/ipc/Server.java | 53 ++++++++++--------- .../apache/hadoop/security/SaslRpcClient.java | 8 +-- .../org/apache/hadoop/util/ProtoUtil.java | 7 ++- .../org/apache/hadoop/util/StringUtils.java | 13 +++++ .../src/main/proto/RpcHeader.proto | 4 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 6 +-- .../apache/hadoop/ipc/TestProtoBufRpc.java | 4 ++ .../org/apache/hadoop/util/TestProtoUtil.java | 13 +++++ 11 files changed, 136 insertions(+), 38 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index cda4771136d..eaecbbb1894 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -69,6 +69,8 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide negotiation capabilities (daryn) + HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh) + HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn) NEW FEATURES @@ -189,6 +191,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9416. Add new symlink resolution methods in FileSystem and FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe) + HADOOP-9720. Rename Client#uuid to Client#clientId. (Arpit Agarwal via + suresh) + HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol, RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 94e111d5727..51d50db65fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -85,6 +85,7 @@ import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -114,6 +115,7 @@ public class Client { private final int connectionTimeout; private final boolean fallbackAllowed; + private final byte[] clientId; final static int PING_CALL_ID = -1; @@ -815,8 +817,8 @@ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method - out.write(Server.HEADER.array()); - out.write(Server.CURRENT_VERSION); + out.write(RpcConstants.HEADER.array()); + out.write(RpcConstants.CURRENT_VERSION); out.write(serviceClass); final AuthProtocol authProtocol; switch (authMethod) { @@ -895,7 +897,7 @@ private synchronized void sendPing() throws IOException { if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); synchronized (out) { - out.writeInt(PING_CALL_ID); + out.writeInt(RpcConstants.PING_CALL_ID); out.flush(); } } @@ -951,7 +953,7 @@ public void sendRpcRequest(final Call call) // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id); + call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId); header.writeDelimitedTo(d); call.rpcRequest.write(d); @@ -1151,6 +1153,7 @@ public Client(Class valueClass, Configuration conf, CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.clientId = StringUtils.getUuidBytes(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java new file mode 100644 index 00000000000..3f03ade2a7a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class RpcConstants { + private RpcConstants() { + // Hidden Constructor + } + + public static final int PING_CALL_ID = -1; + + public static final byte[] DUMMY_CLIENT_ID = new byte[0]; + + + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + + // 1 : Introduce ping and server does not throw away RPCs + // 3 : Introduce the protocol into the RPC connection header + // 4 : Introduced SASL security layer + // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} + // in ObjectWritable to efficiently transmit arrays of primitives + // 6 : Made RPC Request header explicit + // 7 : Changed Ipc Connection Header to use Protocol buffers + // 8 : SASL server always sends a final response + // 9 : Changes to protocol for HADOOP-8990 + public static final byte CURRENT_VERSION = 9; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index ee633b137fa..9889ad037a6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -71,6 +71,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; +import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -155,11 +157,7 @@ boolean isTerse(Class t) { return terseExceptions.contains(t.toString()); } } - - /** - * The first four bytes of Hadoop RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + /** * If the user accidentally sends an HTTP GET to an IPC port, we detect this @@ -177,17 +175,6 @@ boolean isTerse(Class t) { "Content-type: text/plain\r\n\r\n" + "It looks like you are making an HTTP request to a Hadoop IPC port. " + "This is not the correct port for the web interface on this daemon.\r\n"; - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : Introduce the protocol into the RPC connection header - // 4 : Introduced SASL security layer - // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} - // in ObjectWritable to efficiently transmit arrays of primitives - // 6 : Made RPC Request header explicit - // 7 : Changed Ipc Connection Header to use Protocol buffers - // 8 : SASL server always sends a final response - // 9 : Changes to protocol for HADOOP-8990 - public static final byte CURRENT_VERSION = 9; /** * Initial and max size of response buffer @@ -291,6 +278,15 @@ public static InetAddress getRemoteIp() { } return null; } + + /** + * Returns the clientId from the current RPC request + */ + public static byte[] getClientId() { + Call call = CurCall.get(); + return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID; + } + /** Returns remote address as a string when invoked inside an RPC. * Returns null in case of an error. */ @@ -451,17 +447,22 @@ private static class Call { // time served when response is not null private ByteBuffer rpcResponse; // the response for this call private final RPC.RpcKind rpcKind; + private final byte[] clientId; public Call(int id, Writable param, Connection connection) { - this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); + this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, + RpcConstants.DUMMY_CLIENT_ID); } - public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { + + public Call(int id, Writable param, Connection connection, + RPC.RpcKind kind, byte[] clientId) { this.callId = id; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); this.rpcResponse = null; this.rpcKind = kind; + this.clientId = clientId; } @Override @@ -1439,8 +1440,9 @@ public int readAndProcess() throws IOException, InterruptedException { setupHttpRequestOnIpcPortResponse(); return -1; } - - if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { + + if (!RpcConstants.HEADER.equals(dataLengthBuffer) + || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + @@ -1462,7 +1464,7 @@ public int readAndProcess() throws IOException, InterruptedException { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { + if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) { // covers the !useSasl too dataLengthBuffer.clear(); return 0; // ping message @@ -1702,7 +1704,7 @@ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException, unwrappedDataLengthBuffer.flip(); int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - if (unwrappedDataLength == Client.PING_CALL_ID) { + if (unwrappedDataLength == RpcConstants.PING_CALL_ID) { if (LOG.isDebugEnabled()) LOG.debug("Received ping message"); unwrappedDataLengthBuffer.clear(); @@ -1834,8 +1836,9 @@ private void processRpcRequest(RpcRequestHeaderProto header, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } - Call call = new Call(header.getCallId(), rpcRequest, this, - ProtoUtil.convert(header.getRpcKind())); + Call call = new Call(header.getCallId(), rpcRequest, this, + ProtoUtil.convert(header.getRpcKind()), header.getClientId() + .toByteArray()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -2258,7 +2261,7 @@ private void setupResponse(ByteArrayOutputStream responseBuf, RpcResponseHeaderProto.newBuilder(); headerBuilder.setCallId(call.callId); headerBuilder.setStatus(status); - headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION); + headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); if (status == RpcStatusProto.SUCCESS) { RpcResponseHeaderProto header = headerBuilder.build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index 372e13b5a11..a72bfbfeabc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; @@ -71,9 +72,10 @@ public class SaslRpcClient { private final AuthMethod authMethod; private final SaslClient saslClient; private final boolean fallbackAllowed; - private static final RpcRequestHeaderProto saslHeader = - ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, - OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId); + private static final RpcRequestHeaderProto saslHeader = ProtoUtil + .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, + RpcConstants.DUMMY_CLIENT_ID); private static final RpcSaslProto negotiateRequest = RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index ac6c572b346..7e32ffa27bf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -28,6 +28,8 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; +import com.google.protobuf.ByteString; + public abstract class ProtoUtil { /** @@ -158,9 +160,10 @@ public static RPC.RpcKind convert( RpcKindProto kind) { } public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, - RpcRequestHeaderProto.OperationProto operation, int callId) { + RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); - result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); + result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) + .setClientId(ByteString.copyFrom(uuid)); return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index f8b9e4efbe8..27096b441ae 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -22,6 +22,7 @@ import java.io.StringWriter; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.text.DateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.Locale; import java.util.Map; import java.util.StringTokenizer; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -894,4 +896,15 @@ public static String replaceTokens(String template, Pattern pattern, matcher.appendTail(sb); return sb.toString(); } + + /** + * Return a new UUID as byte[] + */ + public static byte[] getUuidBytes() { + UUID uuid = UUID.randomUUID(); + ByteBuffer buf = ByteBuffer.wrap(new byte[16]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 872f29db730..92f7fbe2ff1 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -62,7 +62,9 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional RpcKindProto rpcKind = 1; optional OperationProto rpcOp = 2; - required uint32 callId = 3; // each rpc has a callId that is also used in response + required uint32 callId = 3; // a sequence number that is sent back in response + required bytes clientId = 4; // Globally unique client ID + // clientId + callId uniquely identifies a request } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 6078ae121f3..d30e9145e5b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -736,7 +736,7 @@ private static abstract class NetworkTraces { "6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n"); final static String HADOOP0_18_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 2"; /** @@ -775,7 +775,7 @@ private static abstract class NetworkTraces { "00 14 .. \n"); final static String HADOOP0_20_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 3"; @@ -790,7 +790,7 @@ private static abstract class NetworkTraces { final static String HADOOP0_21_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 4"; final static byte[] HADOOP_0_21_0_RPC_DUMP = diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 2ec56eb5ea9..547b7fe1101 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -72,6 +72,10 @@ public static class PBServerImpl implements TestRpcService { @Override public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); return EmptyResponseProto.newBuilder().build(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 89f67ee8b40..22200d6c1fe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -17,13 +17,18 @@ */ package org.apache.hadoop.util; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.junit.Test; import com.google.protobuf.CodedOutputStream; @@ -69,4 +74,12 @@ private void doVarIntTest(int value) throws IOException { new ByteArrayInputStream(baos.toByteArray())); assertEquals(value, ProtoUtil.readRawVarint32(dis)); } + + @Test + public void testRpcClientId() { + byte[] uuid = StringUtils.getUuidBytes(); + RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( + RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); + assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); + } }