diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index eaecbbb1894..cda4771136d 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -69,8 +69,6 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide negotiation capabilities (daryn) - HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh) - HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn) NEW FEATURES @@ -191,9 +189,6 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9416. Add new symlink resolution methods in FileSystem and FileSystemLinkResolver. (Andrew Wang via Colin Patrick McCabe) - HADOOP-9720. Rename Client#uuid to Client#clientId. (Arpit Agarwal via - suresh) - HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol, RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 51d50db65fa..94e111d5727 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -85,7 +85,6 @@ import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -115,7 +114,6 @@ public class Client { private final int connectionTimeout; private final boolean fallbackAllowed; - private final byte[] clientId; final static int PING_CALL_ID = -1; @@ -817,8 +815,8 @@ public class Client { throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method - out.write(RpcConstants.HEADER.array()); - out.write(RpcConstants.CURRENT_VERSION); + out.write(Server.HEADER.array()); + out.write(Server.CURRENT_VERSION); out.write(serviceClass); final AuthProtocol authProtocol; switch (authMethod) { @@ -897,7 +895,7 @@ public class Client { if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); synchronized (out) { - out.writeInt(RpcConstants.PING_CALL_ID); + out.writeInt(PING_CALL_ID); out.flush(); } } @@ -953,7 +951,7 @@ public class Client { // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId); + call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id); header.writeDelimitedTo(d); call.rpcRequest.write(d); @@ -1153,7 +1151,6 @@ public class Client { CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); - this.clientId = StringUtils.getUuidBytes(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java deleted file mode 100644 index 3f03ade2a7a..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience; - -@InterfaceAudience.Private -public class RpcConstants { - private RpcConstants() { - // Hidden Constructor - } - - public static final int PING_CALL_ID = -1; - - public static final byte[] DUMMY_CLIENT_ID = new byte[0]; - - - /** - * The first four bytes of Hadoop RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : Introduce the protocol into the RPC connection header - // 4 : Introduced SASL security layer - // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} - // in ObjectWritable to efficiently transmit arrays of primitives - // 6 : Made RPC Request header explicit - // 7 : Changed Ipc Connection Header to use Protocol buffers - // 8 : SASL server always sends a final response - // 9 : Changes to protocol for HADOOP-8990 - public static final byte CURRENT_VERSION = 9; -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9889ad037a6..ee633b137fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -71,8 +71,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; -import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -157,7 +155,11 @@ public abstract class Server { return terseExceptions.contains(t.toString()); } } - + + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); /** * If the user accidentally sends an HTTP GET to an IPC port, we detect this @@ -175,6 +177,17 @@ public abstract class Server { "Content-type: text/plain\r\n\r\n" + "It looks like you are making an HTTP request to a Hadoop IPC port. " + "This is not the correct port for the web interface on this daemon.\r\n"; + + // 1 : Introduce ping and server does not throw away RPCs + // 3 : Introduce the protocol into the RPC connection header + // 4 : Introduced SASL security layer + // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} + // in ObjectWritable to efficiently transmit arrays of primitives + // 6 : Made RPC Request header explicit + // 7 : Changed Ipc Connection Header to use Protocol buffers + // 8 : SASL server always sends a final response + // 9 : Changes to protocol for HADOOP-8990 + public static final byte CURRENT_VERSION = 9; /** * Initial and max size of response buffer @@ -278,15 +291,6 @@ public abstract class Server { } return null; } - - /** - * Returns the clientId from the current RPC request - */ - public static byte[] getClientId() { - Call call = CurCall.get(); - return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID; - } - /** Returns remote address as a string when invoked inside an RPC. * Returns null in case of an error. */ @@ -447,22 +451,17 @@ public abstract class Server { // time served when response is not null private ByteBuffer rpcResponse; // the response for this call private final RPC.RpcKind rpcKind; - private final byte[] clientId; public Call(int id, Writable param, Connection connection) { - this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, - RpcConstants.DUMMY_CLIENT_ID); + this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); } - - public Call(int id, Writable param, Connection connection, - RPC.RpcKind kind, byte[] clientId) { + public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { this.callId = id; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); this.rpcResponse = null; this.rpcKind = kind; - this.clientId = clientId; } @Override @@ -1440,9 +1439,8 @@ public abstract class Server { setupHttpRequestOnIpcPortResponse(); return -1; } - - if (!RpcConstants.HEADER.equals(dataLengthBuffer) - || version != CURRENT_VERSION) { + + if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + @@ -1464,7 +1462,7 @@ public abstract class Server { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) { + if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { // covers the !useSasl too dataLengthBuffer.clear(); return 0; // ping message @@ -1704,7 +1702,7 @@ public abstract class Server { unwrappedDataLengthBuffer.flip(); int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - if (unwrappedDataLength == RpcConstants.PING_CALL_ID) { + if (unwrappedDataLength == Client.PING_CALL_ID) { if (LOG.isDebugEnabled()) LOG.debug("Received ping message"); unwrappedDataLengthBuffer.clear(); @@ -1836,9 +1834,8 @@ public abstract class Server { RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } - Call call = new Call(header.getCallId(), rpcRequest, this, - ProtoUtil.convert(header.getRpcKind()), header.getClientId() - .toByteArray()); + Call call = new Call(header.getCallId(), rpcRequest, this, + ProtoUtil.convert(header.getRpcKind())); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -2261,7 +2258,7 @@ public abstract class Server { RpcResponseHeaderProto.newBuilder(); headerBuilder.setCallId(call.callId); headerBuilder.setStatus(status); - headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); + headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION); if (status == RpcStatusProto.SUCCESS) { RpcResponseHeaderProto header = headerBuilder.build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index a72bfbfeabc..372e13b5a11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -46,7 +46,6 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; @@ -72,10 +71,9 @@ public class SaslRpcClient { private final AuthMethod authMethod; private final SaslClient saslClient; private final boolean fallbackAllowed; - private static final RpcRequestHeaderProto saslHeader = ProtoUtil - .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, - OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, - RpcConstants.DUMMY_CLIENT_ID); + private static final RpcRequestHeaderProto saslHeader = + ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId); private static final RpcSaslProto negotiateRequest = RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 7e32ffa27bf..ac6c572b346 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -28,8 +28,6 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; -import com.google.protobuf.ByteString; - public abstract class ProtoUtil { /** @@ -160,10 +158,9 @@ public abstract class ProtoUtil { } public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, - RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { + RpcRequestHeaderProto.OperationProto operation, int callId) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); - result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) - .setClientId(ByteString.copyFrom(uuid)); + result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 27096b441ae..f8b9e4efbe8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -22,7 +22,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.text.DateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.StringTokenizer; -import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -896,15 +894,4 @@ public class StringUtils { matcher.appendTail(sb); return sb.toString(); } - - /** - * Return a new UUID as byte[] - */ - public static byte[] getUuidBytes() { - UUID uuid = UUID.randomUUID(); - ByteBuffer buf = ByteBuffer.wrap(new byte[16]); - buf.putLong(uuid.getMostSignificantBits()); - buf.putLong(uuid.getLeastSignificantBits()); - return buf.array(); - } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 92f7fbe2ff1..872f29db730 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -62,9 +62,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional RpcKindProto rpcKind = 1; optional OperationProto rpcOp = 2; - required uint32 callId = 3; // a sequence number that is sent back in response - required bytes clientId = 4; // Globally unique client ID - // clientId + callId uniquely identifies a request + required uint32 callId = 3; // each rpc has a callId that is also used in response } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d30e9145e5b..6078ae121f3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -736,7 +736,7 @@ public class TestIPC { "6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n"); final static String HADOOP0_18_ERROR_MSG = - "Server IPC version " + RpcConstants.CURRENT_VERSION + + "Server IPC version " + Server.CURRENT_VERSION + " cannot communicate with client version 2"; /** @@ -775,7 +775,7 @@ public class TestIPC { "00 14 .. \n"); final static String HADOOP0_20_ERROR_MSG = - "Server IPC version " + RpcConstants.CURRENT_VERSION + + "Server IPC version " + Server.CURRENT_VERSION + " cannot communicate with client version 3"; @@ -790,7 +790,7 @@ public class TestIPC { final static String HADOOP0_21_ERROR_MSG = - "Server IPC version " + RpcConstants.CURRENT_VERSION + + "Server IPC version " + Server.CURRENT_VERSION + " cannot communicate with client version 4"; final static byte[] HADOOP_0_21_0_RPC_DUMP = diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 547b7fe1101..2ec56eb5ea9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -72,10 +72,6 @@ public class TestProtoBufRpc { @Override public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) throws ServiceException { - // Ensure clientId is received - byte[] clientId = Server.getClientId(); - Assert.assertNotNull(Server.getClientId()); - Assert.assertEquals(16, clientId.length); return EmptyResponseProto.newBuilder().build(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 22200d6c1fe..89f67ee8b40 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -17,18 +17,13 @@ */ package org.apache.hadoop.util; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; -import java.util.Arrays; -import org.apache.hadoop.ipc.RPC.RpcKind; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.junit.Test; import com.google.protobuf.CodedOutputStream; @@ -74,12 +69,4 @@ public class TestProtoUtil { new ByteArrayInputStream(baos.toByteArray())); assertEquals(value, ProtoUtil.readRawVarint32(dis)); } - - @Test - public void testRpcClientId() { - byte[] uuid = StringUtils.getUuidBytes(); - RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); - assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); - } }