HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.

This commit is contained in:
Erik Krogen 2018-12-13 14:31:41 -08:00 committed by Chen Liang
parent 82f68a47c4
commit 7b1e3c49d1
6 changed files with 115 additions and 21 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@ -64,9 +66,15 @@ public interface AlignmentContext {
* client state info during RPC response header processing. * client state info during RPC response header processing.
* *
* @param header The RPC request header. * @param header The RPC request header.
* @return state id of in the request header. * @param threshold a parameter to verify a condition when server
* should reject client request due to its state being too far
* misaligned with the client state.
* See implementation for more details.
* @return state id required for the server to execute the call.
* @throws IOException
*/ */
long receiveRequestState(RpcRequestHeaderProto header); long receiveRequestState(RpcRequestHeaderProto header, long threshold)
throws IOException;
/** /**
* Returns the last seen state id of the alignment context instance. * Returns the last seen state id of the alignment context instance.

View File

@ -2531,6 +2531,7 @@ public abstract class Server {
// Save the priority level assignment by the scheduler // Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call)); call.setPriorityLevel(callQueue.getPriorityLevel(call));
call.markCallCoordinated(false);
if(alignmentContext != null && call.rpcRequest != null && if(alignmentContext != null && call.rpcRequest != null &&
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
// if call.rpcRequest is not RpcProtobufRequest, will skip the following // if call.rpcRequest is not RpcProtobufRequest, will skip the following
@ -2539,23 +2540,21 @@ public abstract class Server {
// coordinated. // coordinated.
String methodName; String methodName;
String protoName; String protoName;
ProtobufRpcEngine.RpcProtobufRequest req =
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
try { try {
ProtobufRpcEngine.RpcProtobufRequest req =
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
methodName = req.getRequestHeader().getMethodName(); methodName = req.getRequestHeader().getMethodName();
protoName = req.getRequestHeader().getDeclaringClassProtocolName(); protoName = req.getRequestHeader().getDeclaringClassProtocolName();
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(true);
long stateId;
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
call.setClientStateId(stateId);
}
} catch (IOException ioe) { } catch (IOException ioe) {
throw new RpcServerException("Rpc request header check fail", ioe); throw new RpcServerException("Processing RPC request caught ", ioe);
} }
if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(false);
} else {
call.markCallCoordinated(true);
long stateId = alignmentContext.receiveRequestState(header);
call.setClientStateId(stateId);
}
} else {
call.markCallCoordinated(false);
} }
try { try {
@ -3607,4 +3606,12 @@ public abstract class Server {
idleScanTimer.schedule(idleScanTask, idleScanInterval); idleScanTimer.schedule(idleScanTask, idleScanInterval);
} }
} }
protected int getMaxIdleTime() {
return connectionManager.maxIdleTime;
}
public String getServerName() {
return serverName;
}
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAccumulator;
/** /**
@ -60,7 +61,8 @@ public class ClientGSIContext implements AlignmentContext {
} }
/** /**
* Client side implementation for receiving state alignment info in responses. * Client side implementation for receiving state alignment info
* in responses.
*/ */
@Override @Override
public void receiveResponseState(RpcResponseHeaderProto header) { public void receiveResponseState(RpcResponseHeaderProto header) {
@ -80,7 +82,8 @@ public class ClientGSIContext implements AlignmentContext {
* Client does not receive RPC requests therefore this does nothing. * Client does not receive RPC requests therefore this does nothing.
*/ */
@Override @Override
public long receiveRequestState(RpcRequestHeaderProto header) { public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
throws IOException {
// Do nothing. // Do nothing.
return 0; return 0;
} }

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -36,8 +39,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Stable @InterfaceStability.Stable
class GlobalStateIdContext implements AlignmentContext { class GlobalStateIdContext implements AlignmentContext {
private final FSNamesystem namesystem; /**
* Estimated number of journal transactions a typical NameNode can execute
* per second. The number is used to estimate how long a client's
* RPC request will wait in the call queue before the Observer catches up
* with its state id.
*/
private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;
/**
* The client wait time on an RPC request is composed of
* the server execution time plus the communication time.
* This is an expected fraction of the total wait time spent on
* server execution.
*/
private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;
private final FSNamesystem namesystem;
private final HashSet<String> coordinatedMethods; private final HashSet<String> coordinatedMethods;
/** /**
@ -88,17 +106,41 @@ class GlobalStateIdContext implements AlignmentContext {
} }
/** /**
* Server side implementation for processing state alignment info in requests. * Server-side implementation for processing state alignment info in
* requests.
* For Observer it compares the client and the server states and determines
* if it makes sense to wait until the server catches up with the client
* state. If not the server throws RetriableException so that the client
* could retry the call according to the retry policy with another Observer
* or the Active NameNode.
*
* @param header The RPC request header.
* @param clientWaitTime time in milliseconds indicating how long client
* waits for the server response. It is used to verify if the client's
* state is too far ahead of the server's
* @return the minimum of the state ids of the client or the server.
* @throws RetriableException if Observer is too far behind.
*/ */
@Override @Override
public long receiveRequestState(RpcRequestHeaderProto header) { public long receiveRequestState(RpcRequestHeaderProto header,
long clientWaitTime) throws RetriableException {
long serverStateId = long serverStateId =
namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
long clientStateId = header.getStateId(); long clientStateId = header.getStateId();
if (clientStateId > serverStateId && if (clientStateId > serverStateId &&
HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) { HAServiceState.ACTIVE.equals(namesystem.getState())) {
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
", but server state is: " + serverStateId); ", but server state is: " + serverStateId);
return serverStateId;
}
if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
clientStateId - serverStateId >
ESTIMATED_TRANSACTIONS_PER_SECOND
* TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
* ESTIMATED_SERVER_TIME_MULTIPLIER) {
throw new RetriableException(
"Observer Node is too far behind: serverStateId = "
+ serverStateId + " clientStateId = " + clientStateId);
} }
return clientStateId; return clientStateId;
} }

View File

@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri; import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -34,6 +35,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAccumulator;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -42,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -333,4 +336,21 @@ public abstract class HATestUtil {
} }
} }
} }
/**
* Customize stateId of the client AlignmentContext for testing.
*/
public static long setACStateId(DistributedFileSystem dfs,
long stateId) throws Exception {
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
Field f = ac.getClass().getDeclaredField("lastSeenStateId");
f.setAccessible(true);
LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
long currentStateId = lastSeenStateId.getThenReset();
lastSeenStateId.accumulate(stateId);
return currentStateId;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -138,6 +139,19 @@ public class TestMultiObserverNode {
dfsCluster.transitionToObserver(3); dfsCluster.transitionToObserver(3);
} }
@Test
public void testObserverFallBehind() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
// Set large state Id on the client
long realStateId = HATestUtil.setACStateId(dfs, 500000);
dfs.getFileStatus(testPath);
// Should end up on ANN
assertSentTo(0);
HATestUtil.setACStateId(dfs, realStateId);
}
private void assertSentTo(int... nnIndices) throws IOException { private void assertSentTo(int... nnIndices) throws IOException {
assertTrue("Request was not sent to any of the expected namenodes.", assertTrue("Request was not sent to any of the expected namenodes.",
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices)); HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));