diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 932074857b5..ca1c813bc78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; import java.lang.reflect.Method; import java.util.HashSet; import java.util.concurrent.TimeUnit; @@ -26,9 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -123,7 +126,18 @@ class GlobalStateIdContext implements AlignmentContext { */ @Override public long receiveRequestState(RpcRequestHeaderProto header, - long clientWaitTime) throws RetriableException { + long clientWaitTime) throws IOException { + if (!header.hasStateId() && + HAServiceState.OBSERVER.equals(namesystem.getState())) { + // This could happen if client configured with non-observer proxy provider + // (e.g., ConfiguredFailoverProxyProvider) is accessing a cluster with + // observers. In this case, we should let the client failover to the + // active node, rather than potentially serving stale result (client + // stateId is 0 if not set). + throw new StandbyException("Observer Node received request without " + + "stateId. This mostly likely is because client is not configured " + + "with " + ObserverReadProxyProvider.class.getSimpleName()); + } long serverStateId = getLastSeenStateId(); long clientStateId = header.getStateId(); FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 2f8f1156434..96067858c9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -34,12 +35,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcScheduler; import org.apache.hadoop.ipc.Schedulable; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; @@ -346,6 +350,39 @@ public class TestConsistentReadsObserver { reader.interrupt(); } + @Test + public void testRequestFromNonObserverProxyProvider() throws Exception { + // Create another HDFS client using ConfiguredFailoverProvider + Configuration conf2 = new Configuration(conf); + + // Populate the above configuration with only a single observer in the + // namenode list. Also reduce retries to make test finish faster. + HATestUtil.setFailoverConfigurations( + conf2, + HATestUtil.getLogicalHostname(dfsCluster), + Collections.singletonList( + dfsCluster.getNameNode(2).getNameNodeAddress()), + ConfiguredFailoverProxyProvider.class); + conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1); + conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1); + FileSystem dfs2 = FileSystem.get(conf2); + + dfs.mkdir(testPath, FsPermission.getDefault()); + dfsCluster.rollEditLogAndTail(0); + + try { + // Request should be rejected by observer and throw StandbyException + dfs2.listStatus(testPath); + fail("listStatus should have thrown exception"); + } catch (RemoteException re) { + IOException e = re.unwrapRemoteException(); + assertTrue("should have thrown StandbyException but got " + + e.getClass().getSimpleName(), + e instanceof StandbyException); + } + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));