HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.

This commit is contained in:
Erik Krogen 2018-12-13 14:31:41 -08:00 committed by Chen Liang
parent 589d2c5922
commit c1c061d767
6 changed files with 111 additions and 21 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@ -64,9 +66,15 @@ public interface AlignmentContext {
* client state info during RPC response header processing.
*
* @param header The RPC request header.
* @return state id of in the request header.
* @param threshold a parameter to verify a condition when server
* should reject client request due to its state being too far
* misaligned with the client state.
* See implementation for more details.
* @return state id required for the server to execute the call.
* @throws IOException
*/
long receiveRequestState(RpcRequestHeaderProto header);
long receiveRequestState(RpcRequestHeaderProto header, long threshold)
throws IOException;
/**
* Returns the last seen state id of the alignment context instance.

View File

@ -2536,6 +2536,7 @@ public abstract class Server {
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
call.markCallCoordinated(false);
if(alignmentContext != null && call.rpcRequest != null &&
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
@ -2544,23 +2545,21 @@ public abstract class Server {
// coordinated.
String methodName;
String protoName;
ProtobufRpcEngine.RpcProtobufRequest req =
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
try {
ProtobufRpcEngine.RpcProtobufRequest req =
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
methodName = req.getRequestHeader().getMethodName();
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(true);
long stateId;
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
call.setClientStateId(stateId);
}
} catch (IOException ioe) {
throw new RpcServerException("Rpc request header check fail", ioe);
throw new RpcServerException("Processing RPC request caught ", ioe);
}
if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(false);
} else {
call.markCallCoordinated(true);
long stateId = alignmentContext.receiveRequestState(header);
call.setClientStateId(stateId);
}
} else {
call.markCallCoordinated(false);
}
try {
@ -3613,6 +3612,10 @@ public abstract class Server {
}
}
protected int getMaxIdleTime() {
return connectionManager.maxIdleTime;
}
public String getServerName() {
return serverName;
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
/**
@ -60,7 +61,8 @@ public class ClientGSIContext implements AlignmentContext {
}
/**
* Client side implementation for receiving state alignment info in responses.
* Client side implementation for receiving state alignment info
* in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
@ -80,7 +82,8 @@ public class ClientGSIContext implements AlignmentContext {
* Client does not receive RPC requests therefore this does nothing.
*/
@Override
public long receiveRequestState(RpcRequestHeaderProto header) {
public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
throws IOException {
// Do nothing.
return 0;
}

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -36,8 +39,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@InterfaceAudience.Private
@InterfaceStability.Stable
class GlobalStateIdContext implements AlignmentContext {
private final FSNamesystem namesystem;
/**
* Estimated number of journal transactions a typical NameNode can execute
* per second. The number is used to estimate how long a client's
* RPC request will wait in the call queue before the Observer catches up
* with its state id.
*/
private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;
/**
* The client wait time on an RPC request is composed of
* the server execution time plus the communication time.
* This is an expected fraction of the total wait time spent on
* server execution.
*/
private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;
private final FSNamesystem namesystem;
private final HashSet<String> coordinatedMethods;
/**
@ -88,17 +106,41 @@ class GlobalStateIdContext implements AlignmentContext {
}
/**
* Server side implementation for processing state alignment info in requests.
* Server-side implementation for processing state alignment info in
* requests.
* For Observer it compares the client and the server states and determines
* if it makes sense to wait until the server catches up with the client
* state. If not the server throws RetriableException so that the client
* could retry the call according to the retry policy with another Observer
* or the Active NameNode.
*
* @param header The RPC request header.
* @param clientWaitTime time in milliseconds indicating how long client
* waits for the server response. It is used to verify if the client's
* state is too far ahead of the server's
* @return the minimum of the state ids of the client or the server.
* @throws RetriableException if Observer is too far behind.
*/
@Override
public long receiveRequestState(RpcRequestHeaderProto header) {
public long receiveRequestState(RpcRequestHeaderProto header,
long clientWaitTime) throws RetriableException {
long serverStateId =
namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
long clientStateId = header.getStateId();
if (clientStateId > serverStateId &&
HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
HAServiceState.ACTIVE.equals(namesystem.getState())) {
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
", but server state is: " + serverStateId);
return serverStateId;
}
if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
clientStateId - serverStateId >
ESTIMATED_TRANSACTIONS_PER_SECOND
* TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
* ESTIMATED_SERVER_TIME_MULTIPLIER) {
throw new RetriableException(
"Observer Node is too far behind: serverStateId = "
+ serverStateId + " clientStateId = " + clientStateId);
}
return clientStateId;
}

View File

@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
@ -34,6 +35,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAccumulator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -334,4 +337,21 @@ public abstract class HATestUtil {
}
}
}
/**
* Customize stateId of the client AlignmentContext for testing.
*/
public static long setACStateId(DistributedFileSystem dfs,
long stateId) throws Exception {
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
Field f = ac.getClass().getDeclaredField("lastSeenStateId");
f.setAccessible(true);
LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
long currentStateId = lastSeenStateId.getThenReset();
lastSeenStateId.accumulate(stateId);
return currentStateId;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -138,6 +139,19 @@ public class TestMultiObserverNode {
dfsCluster.transitionToObserver(3);
}
@Test
public void testObserverFallBehind() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
// Set large state Id on the client
long realStateId = HATestUtil.setACStateId(dfs, 500000);
dfs.getFileStatus(testPath);
// Should end up on ANN
assertSentTo(0);
HATestUtil.setACStateId(dfs, realStateId);
}
private void assertSentTo(int... nnIndices) throws IOException {
assertTrue("Request was not sent to any of the expected namenodes.",
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));