HDFS-14660. [SBN Read] ObserverNameNode should throw StandbyException for requests not from ObserverProxyProvider. Contributed by Chao Sun.

This commit is contained in:
Ayush Saxena 2019-07-28 08:41:42 +05:30 committed by Konstantin V Shvachko
parent 3186afa24c
commit ebd7a0f155
2 changed files with 52 additions and 1 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@ -26,9 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -123,7 +126,18 @@ class GlobalStateIdContext implements AlignmentContext {
*/
@Override
public long receiveRequestState(RpcRequestHeaderProto header,
long clientWaitTime) throws RetriableException {
long clientWaitTime) throws IOException {
if (!header.hasStateId() &&
HAServiceState.OBSERVER.equals(namesystem.getState())) {
// This could happen if client configured with non-observer proxy provider
// (e.g., ConfiguredFailoverProxyProvider) is accessing a cluster with
// observers. In this case, we should let the client failover to the
// active node, rather than potentially serving stale result (client
// stateId is 0 if not set).
throw new StandbyException("Observer Node received request without "
+ "stateId. This mostly likely is because client is not configured "
+ "with " + ObserverReadProxyProvider.class.getSimpleName());
}
long serverStateId = getLastSeenStateId();
long clientStateId = header.getStateId();
FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}",

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -34,12 +35,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
@ -346,6 +350,39 @@ public class TestConsistentReadsObserver {
reader.interrupt();
}
@Test
public void testRequestFromNonObserverProxyProvider() throws Exception {
// Create another HDFS client using ConfiguredFailoverProvider
Configuration conf2 = new Configuration(conf);
// Populate the above configuration with only a single observer in the
// namenode list. Also reduce retries to make test finish faster.
HATestUtil.setFailoverConfigurations(
conf2,
HATestUtil.getLogicalHostname(dfsCluster),
Collections.singletonList(
dfsCluster.getNameNode(2).getNameNodeAddress()),
ConfiguredFailoverProxyProvider.class);
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
FileSystem dfs2 = FileSystem.get(conf2);
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
try {
// Request should be rejected by observer and throw StandbyException
dfs2.listStatus(testPath);
fail("listStatus should have thrown exception");
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException();
assertTrue("should have thrown StandbyException but got "
+ e.getClass().getSimpleName(),
e instanceof StandbyException);
}
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));