diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index b4130d714b3..0df5e1e71d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -44,6 +44,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcInvocationHandler; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,8 @@ @InterfaceStability.Evolving public class ObserverReadProxyProvider extends AbstractNNFailoverProxyProvider { - private static final Logger LOG = LoggerFactory.getLogger( + @VisibleForTesting + static final Logger LOG = LoggerFactory.getLogger( ObserverReadProxyProvider.class); /** Configuration key for {@link #autoMsyncPeriodMs}. */ @@ -251,20 +253,38 @@ private synchronized NNProxyInfo changeProxy(NNProxyInfo initial) { } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - try { - HAServiceState state = currentProxy.proxy.getHAServiceState(); - currentProxy.setCachedState(state); - } catch (IOException e) { - LOG.info("Failed to connect to {}. Setting cached state to Standby", - currentProxy.getAddress(), e); - currentProxy.setCachedState(HAServiceState.STANDBY); - } + currentProxy.setCachedState(getHAServiceState(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Fetch the service state from a proxy. If it is unable to be fetched, + * assume it is in standby state, but log the exception. + */ + private HAServiceState getHAServiceState(NNProxyInfo proxyInfo) { + IOException ioe; + try { + return proxyInfo.proxy.getHAServiceState(); + } catch (RemoteException re) { + // Though a Standby will allow a getHAServiceState call, it won't allow + // delegation token lookup, so if DT is used it throws StandbyException + if (re.unwrapRemoteException() instanceof StandbyException) { + LOG.debug("NameNode {} threw StandbyException when fetching HAState", + proxyInfo.getAddress()); + return HAServiceState.STANDBY; + } + ioe = re; + } catch (IOException e) { + ioe = e; + } + LOG.info("Failed to connect to {}. Assuming Standby state", + proxyInfo.getAddress(), ioe); + return HAServiceState.STANDBY; + } + /** * This will call {@link ClientProtocol#msync()} on the active NameNode * (via the {@link #failoverProxy}) to initialize the state of this client. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index f4843ac1359..261bf8cf6af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -276,24 +276,34 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, /** Sets the required configurations for performing failover. */ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex) { + setFailoverConfigurations(cluster, conf, logicalName, nsIndex, + ConfiguredFailoverProxyProvider.class); + } + + /** Sets the required configurations for performing failover. */ + public static

> void + setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, + String logicalName, int nsIndex, Class

classFPP) { MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); List nnAddresses = new ArrayList(3); for (MiniDFSCluster.NameNodeInfo nn : nns) { nnAddresses.add(nn.nameNode.getNameNodeAddress()); } - setFailoverConfigurations(conf, logicalName, nnAddresses); + setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP); } public static void setFailoverConfigurations(Configuration conf, String logicalName, InetSocketAddress ... nnAddresses){ - setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses)); + setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses), + ConfiguredFailoverProxyProvider.class); } /** * Sets the required configurations for performing failover */ - public static void setFailoverConfigurations(Configuration conf, - String logicalName, List nnAddresses) { + public static

> void + setFailoverConfigurations(Configuration conf, String logicalName, + List nnAddresses, Class

classFPP) { setFailoverConfigurations(conf, logicalName, Iterables.transform(nnAddresses, new Function() { @@ -302,7 +312,7 @@ public static void setFailoverConfigurations(Configuration conf, public String apply(InetSocketAddress addr) { return "hdfs://" + addr.getHostName() + ":" + addr.getPort(); } - }), ConfiguredFailoverProxyProvider.class); + }), classFPP); } public static

> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 7076ec674bf..718d13f124b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -47,6 +48,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.event.Level; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -112,6 +114,50 @@ public void shutdownCluster() throws IOException { } } + /** + * Test that, when using ObserverReadProxyProvider with DT authentication, + * the ORPP gracefully handles when the Standby NN throws a StandbyException. + */ + @Test(timeout = 300000) + public void testObserverReadProxyProviderWithDT() throws Exception { + // Make the first node standby, so that the ORPP will try it first + // instead of just using and succeeding on the active + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + HATestUtil.setFailoverConfigurations(cluster, conf, + HATestUtil.getLogicalHostname(cluster), 0, + ObserverReadProxyProvider.class); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + + dfs = (DistributedFileSystem) FileSystem.get(conf); + final UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("JobTracker"); + final Token token = + getDelegationToken(dfs, ugi.getShortUserName()); + ugi.addToken(token); + // Recreate the DFS, this time authenticating using a DT + dfs = ugi.doAs((PrivilegedExceptionAction) + () -> (DistributedFileSystem) FileSystem.get(conf)); + + GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG); + GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer + .captureLogs(ObserverReadProxyProvider.LOG); + try { + dfs.access(new Path("/"), FsAction.READ); + assertTrue(logCapture.getOutput() + .contains("threw StandbyException when fetching HAState")); + HATestUtil.isSentToAnyOfNameNodes(dfs, cluster, 1); + + cluster.shutdownNameNode(0); + logCapture.clearOutput(); + dfs.access(new Path("/"), FsAction.READ); + assertTrue(logCapture.getOutput().contains("Assuming Standby state")); + } finally { + logCapture.stopCapturing(); + } + } + @Test(timeout = 300000) public void testDelegationTokenDFSApi() throws Exception { final Token token =