HDFS-14660. [SBN Read] ObserverNameNode should throw StandbyException for requests not from ObserverProxyProvider. Contributed by Chao Sun.
This commit is contained in:
parent
8ee3a8768d
commit
7ca418c2c0
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -26,9 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
|
|
||||||
|
@ -123,7 +126,18 @@ class GlobalStateIdContext implements AlignmentContext {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long receiveRequestState(RpcRequestHeaderProto header,
|
public long receiveRequestState(RpcRequestHeaderProto header,
|
||||||
long clientWaitTime) throws RetriableException {
|
long clientWaitTime) throws IOException {
|
||||||
|
if (!header.hasStateId() &&
|
||||||
|
HAServiceState.OBSERVER.equals(namesystem.getState())) {
|
||||||
|
// This could happen if client configured with non-observer proxy provider
|
||||||
|
// (e.g., ConfiguredFailoverProxyProvider) is accessing a cluster with
|
||||||
|
// observers. In this case, we should let the client failover to the
|
||||||
|
// active node, rather than potentially serving stale result (client
|
||||||
|
// stateId is 0 if not set).
|
||||||
|
throw new StandbyException("Observer Node received request without "
|
||||||
|
+ "stateId. This mostly likely is because client is not configured "
|
||||||
|
+ "with " + ObserverReadProxyProvider.class.getSimpleName());
|
||||||
|
}
|
||||||
long serverStateId = getLastSeenStateId();
|
long serverStateId = getLastSeenStateId();
|
||||||
long clientStateId = header.getStateId();
|
long clientStateId = header.getStateId();
|
||||||
FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}",
|
FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}",
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -34,12 +35,15 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RpcScheduler;
|
import org.apache.hadoop.ipc.RpcScheduler;
|
||||||
import org.apache.hadoop.ipc.Schedulable;
|
import org.apache.hadoop.ipc.Schedulable;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -346,6 +350,39 @@ public class TestConsistentReadsObserver {
|
||||||
reader.interrupt();
|
reader.interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestFromNonObserverProxyProvider() throws Exception {
|
||||||
|
// Create another HDFS client using ConfiguredFailoverProvider
|
||||||
|
Configuration conf2 = new Configuration(conf);
|
||||||
|
|
||||||
|
// Populate the above configuration with only a single observer in the
|
||||||
|
// namenode list. Also reduce retries to make test finish faster.
|
||||||
|
HATestUtil.setFailoverConfigurations(
|
||||||
|
conf2,
|
||||||
|
HATestUtil.getLogicalHostname(dfsCluster),
|
||||||
|
Collections.singletonList(
|
||||||
|
dfsCluster.getNameNode(2).getNameNodeAddress()),
|
||||||
|
ConfiguredFailoverProxyProvider.class);
|
||||||
|
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||||
|
conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
|
||||||
|
conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
|
||||||
|
FileSystem dfs2 = FileSystem.get(conf2);
|
||||||
|
|
||||||
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||||
|
dfsCluster.rollEditLogAndTail(0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Request should be rejected by observer and throw StandbyException
|
||||||
|
dfs2.listStatus(testPath);
|
||||||
|
fail("listStatus should have thrown exception");
|
||||||
|
} catch (RemoteException re) {
|
||||||
|
IOException e = re.unwrapRemoteException();
|
||||||
|
assertTrue("should have thrown StandbyException but got "
|
||||||
|
+ e.getClass().getSimpleName(),
|
||||||
|
e instanceof StandbyException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertSentTo(int nnIdx) throws IOException {
|
private void assertSentTo(int nnIdx) throws IOException {
|
||||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||||
|
|
Loading…
Reference in New Issue