diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 3d309235fe8..8d43fd74a84 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -46,7 +46,7 @@ public interface AlignmentContext { void updateResponseState(RpcResponseHeaderProto.Builder header); /** - * This is the intended client method call to implement to recieve state info + * This is the intended client method call to implement to receive state info * during RPC response processing. * * @param header The RPC response header. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9e622a8f20c..ec624cc3b73 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -925,7 +925,7 @@ public static class Call implements Schedulable, private volatile String detailedMetricsName = ""; final int callId; // the client's call id final int retryCount; // the retry count of the call - long timestampNanos; // time the call was received + private final long timestampNanos; // time the call was received long responseTimestampNanos; // time the call was served private AtomicInteger responseWaitCount = new AtomicInteger(1); final RPC.RpcKind rpcKind; @@ -1107,6 +1107,10 @@ public void setDeferredResponse(Writable response) { public void setDeferredError(Throwable t) { } + + public long getTimestampNanos() { + return timestampNanos; + } } /** A RPC extended call queued for handling. */ @@ -1188,7 +1192,7 @@ public Void run() throws Exception { try { value = call( - rpcKind, connection.protocolName, rpcRequest, timestampNanos); + rpcKind, connection.protocolName, rpcRequest, getTimestampNanos()); } catch (Throwable e) { populateResponseParamsOnError(e, responseParams); } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 042928c2aee..d9becf722e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional RPCTraceInfoProto traceInfo = 6; // tracing info optional RPCCallerContextProto callerContext = 7; // call context optional int64 stateId = 8; // The last seen Global State ID + // Alignment context info for use with routers. + // The client should not interpret these bytes, but only forward bytes + // received from RpcResponseHeaderProto.routerFederatedState. + optional bytes routerFederatedState = 9; } @@ -157,6 +161,10 @@ message RpcResponseHeaderProto { optional bytes clientId = 7; // Globally unique client ID optional sint32 retryCount = 8 [default = -1]; optional int64 stateId = 9; // The last written Global State ID + // Alignment context info for use with routers. + // The client should not interpret these bytes, but only + // forward them to the router using RpcRequestHeaderProto.routerFederatedState. + optional bytes routerFederatedState = 10; } message RpcSaslProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index 336130e419a..7f61d80fe1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -312,3 +312,16 @@ message GetDisabledNameservicesRequestProto { message GetDisabledNameservicesResponseProto { repeated string nameServiceIds = 1; } + +///////////////////////////////////////////////// +// Alignment state for namespaces. +///////////////////////////////////////////////// + +/** + * Clients should receive this message in RPC responses and forward it + * in RPC requests without interpreting it. It should be encoded + * as an obscure byte array when being sent to clients. + */ +message RouterFederatedStateProto { + map namespaceStateIds = 1; // Last seen state IDs for multiple namespaces. +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java new file mode 100644 index 00000000000..60ab4b2c0bc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcConstants; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; +import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.util.ProtoUtil; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestRouterFederatedState { + + @Test + public void testRpcRouterFederatedState() throws InvalidProtocolBufferException { + byte[] uuid = ClientId.getClientId(); + Map expectedStateIds = new HashMap() {{ + put("namespace1", 11L ); + put("namespace2", 22L); + }}; + + AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds); + + RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0, + RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext); + + Map stateIdsFromHeader = + RouterFederatedStateProto.parseFrom( + header.getRouterFederatedState().toByteArray() + ).getNamespaceStateIdsMap(); + + assertEquals(expectedStateIds, stateIdsFromHeader); + } + + private static class AlignmentContextWithRouterState implements AlignmentContext { + + Map routerFederatedState; + + public AlignmentContextWithRouterState(Map namespaceStates) { + this.routerFederatedState = namespaceStates; + } + + @Override + public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) { + RouterFederatedStateProto fedState = RouterFederatedStateProto + .newBuilder() + .putAllNamespaceStateIds(routerFederatedState) + .build(); + + header.setRouterFederatedState(fedState.toByteString()); + } + + @Override + public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {} + + @Override + public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {} + + @Override + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException { + return 0; + } + + @Override + public long getLastSeenStateId() { + return 0; + } + + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + return false; + } + } +} \ No newline at end of file