diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index a435ff6c4e6..bcddfbf1e26 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; @@ -64,9 +66,15 @@ public interface AlignmentContext { * client state info during RPC response header processing. * * @param header The RPC request header. - * @return state id of in the request header. + * @param threshold a parameter to verify a condition when server + * should reject client request due to its state being too far + * misaligned with the client state. + * See implementation for more details. + * @return state id required for the server to execute the call. + * @throws IOException */ - long receiveRequestState(RpcRequestHeaderProto header); + long receiveRequestState(RpcRequestHeaderProto header, long threshold) + throws IOException; /** * Returns the last seen state id of the alignment context instance. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 20fee612f99..4f11541b86c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2536,6 +2536,7 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); + call.markCallCoordinated(false); if(alignmentContext != null && call.rpcRequest != null && (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { // if call.rpcRequest is not RpcProtobufRequest, will skip the following @@ -2544,23 +2545,21 @@ public abstract class Server { // coordinated. String methodName; String protoName; + ProtobufRpcEngine.RpcProtobufRequest req = + (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; try { - ProtobufRpcEngine.RpcProtobufRequest req = - (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; methodName = req.getRequestHeader().getMethodName(); protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + if (alignmentContext.isCoordinatedCall(protoName, methodName)) { + call.markCallCoordinated(true); + long stateId; + stateId = alignmentContext.receiveRequestState( + header, getMaxIdleTime()); + call.setClientStateId(stateId); + } } catch (IOException ioe) { - throw new RpcServerException("Rpc request header check fail", ioe); + throw new RpcServerException("Processing RPC request caught ", ioe); } - if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { - call.markCallCoordinated(false); - } else { - call.markCallCoordinated(true); - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); - } - } else { - call.markCallCoordinated(false); } try { @@ -3613,6 +3612,10 @@ public abstract class Server { } } + protected int getMaxIdleTime() { + return connectionManager.maxIdleTime; + } + public String getServerName() { return serverName; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 6d366a63c34..a7bdd141359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; +import java.io.IOException; import java.util.concurrent.atomic.LongAccumulator; /** @@ -60,7 +61,8 @@ public class ClientGSIContext implements AlignmentContext { } /** - * Client side implementation for receiving state alignment info in responses. + * Client side implementation for receiving state alignment info + * in responses. */ @Override public void receiveResponseState(RpcResponseHeaderProto header) { @@ -80,7 +82,8 @@ public class ClientGSIContext implements AlignmentContext { * Client does not receive RPC requests therefore this does nothing. */ @Override - public long receiveRequestState(RpcRequestHeaderProto header) { + public long receiveRequestState(RpcRequestHeaderProto header, long threshold) + throws IOException { // Do nothing. return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index ecb9fd36247..2e486541b69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode; import java.lang.reflect.Method; import java.util.HashSet; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -36,8 +39,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @InterfaceAudience.Private @InterfaceStability.Stable class GlobalStateIdContext implements AlignmentContext { - private final FSNamesystem namesystem; + /** + * Estimated number of journal transactions a typical NameNode can execute + * per second. The number is used to estimate how long a client's + * RPC request will wait in the call queue before the Observer catches up + * with its state id. + */ + private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L; + /** + * The client wait time on an RPC request is composed of + * the server execution time plus the communication time. + * This is an expected fraction of the total wait time spent on + * server execution. + */ + private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f; + + private final FSNamesystem namesystem; private final HashSet coordinatedMethods; /** @@ -88,17 +106,41 @@ class GlobalStateIdContext implements AlignmentContext { } /** - * Server side implementation for processing state alignment info in requests. + * Server-side implementation for processing state alignment info in + * requests. + * For Observer it compares the client and the server states and determines + * if it makes sense to wait until the server catches up with the client + * state. If not the server throws RetriableException so that the client + * could retry the call according to the retry policy with another Observer + * or the Active NameNode. + * + * @param header The RPC request header. + * @param clientWaitTime time in milliseconds indicating how long client + * waits for the server response. It is used to verify if the client's + * state is too far ahead of the server's + * @return the minimum of the state ids of the client or the server. + * @throws RetriableException if Observer is too far behind. */ @Override - public long receiveRequestState(RpcRequestHeaderProto header) { + public long receiveRequestState(RpcRequestHeaderProto header, + long clientWaitTime) throws RetriableException { long serverStateId = namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); long clientStateId = header.getStateId(); if (clientStateId > serverStateId && - HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) { + HAServiceState.ACTIVE.equals(namesystem.getState())) { FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + ", but server state is: " + serverStateId); + return serverStateId; + } + if (HAServiceState.OBSERVER.equals(namesystem.getState()) && + clientStateId - serverStateId > + ESTIMATED_TRANSACTIONS_PER_SECOND + * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime) + * ESTIMATED_SERVER_TIME_MULTIPLIER) { + throw new RetriableException( + "Observer Node is too far behind: serverStateId = " + + serverStateId + " clientStateId = " + clientStateId); } return clientStateId; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index d1095ad2d6a..9e83fc1283c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSUtil.createUri; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.URI; @@ -34,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAccumulator; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -334,4 +337,21 @@ public abstract class HATestUtil { } } } + + /** + * Customize stateId of the client AlignmentContext for testing. + */ + public static long setACStateId(DistributedFileSystem dfs, + long stateId) throws Exception { + ObserverReadProxyProvider provider = (ObserverReadProxyProvider) + ((RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); + ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); + Field f = ac.getClass().getDeclaredField("lastSeenStateId"); + f.setAccessible(true); + LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac); + long currentStateId = lastSeenStateId.getThenReset(); + lastSeenStateId.accumulate(stateId); + return currentStateId; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java index 4aa3133a0f7..a8e124568d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.assertTrue; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -138,6 +139,19 @@ public class TestMultiObserverNode { dfsCluster.transitionToObserver(3); } + @Test + public void testObserverFallBehind() throws Exception { + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + // Set large state Id on the client + long realStateId = HATestUtil.setACStateId(dfs, 500000); + dfs.getFileStatus(testPath); + // Should end up on ANN + assertSentTo(0); + HATestUtil.setACStateId(dfs, realStateId); + } + private void assertSentTo(int... nnIndices) throws IOException { assertTrue("Request was not sent to any of the expected namenodes.", HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));