HDFS-14435. [SBN Read] Enable ObserverReadProxyProvider to gracefully handle StandbyException when fetching HAServiceState. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2019-04-17 12:41:48 -07:00
parent 5321235fe8
commit 174b7d3126
3 changed files with 90 additions and 14 deletions

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,7 +68,8 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ObserverReadProxyProvider<T extends ClientProtocol> public class ObserverReadProxyProvider<T extends ClientProtocol>
extends AbstractNNFailoverProxyProvider<T> { extends AbstractNNFailoverProxyProvider<T> {
private static final Logger LOG = LoggerFactory.getLogger( @VisibleForTesting
static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProvider.class); ObserverReadProxyProvider.class);
/** Configuration key for {@link #autoMsyncPeriodMs}. */ /** Configuration key for {@link #autoMsyncPeriodMs}. */
@ -251,20 +253,38 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) {
} }
currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
try { currentProxy.setCachedState(getHAServiceState(currentProxy));
HAServiceState state = currentProxy.proxy.getHAServiceState();
currentProxy.setCachedState(state);
} catch (IOException e) {
LOG.info("Failed to connect to {}. Setting cached state to Standby",
currentProxy.getAddress(), e);
currentProxy.setCachedState(HAServiceState.STANDBY);
}
LOG.debug("Changed current proxy from {} to {}", LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo, initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo); currentProxy.proxyInfo);
return currentProxy; return currentProxy;
} }
/**
* Fetch the service state from a proxy. If it is unable to be fetched,
* assume it is in standby state, but log the exception.
*/
private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
IOException ioe;
try {
return proxyInfo.proxy.getHAServiceState();
} catch (RemoteException re) {
// Though a Standby will allow a getHAServiceState call, it won't allow
// delegation token lookup, so if DT is used it throws StandbyException
if (re.unwrapRemoteException() instanceof StandbyException) {
LOG.debug("NameNode {} threw StandbyException when fetching HAState",
proxyInfo.getAddress());
return HAServiceState.STANDBY;
}
ioe = re;
} catch (IOException e) {
ioe = e;
}
LOG.info("Failed to connect to {}. Assuming Standby state",
proxyInfo.getAddress(), ioe);
return HAServiceState.STANDBY;
}
/** /**
* This will call {@link ClientProtocol#msync()} on the active NameNode * This will call {@link ClientProtocol#msync()} on the active NameNode
* (via the {@link #failoverProxy}) to initialize the state of this client. * (via the {@link #failoverProxy}) to initialize the state of this client.

View File

@ -276,24 +276,34 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster,
/** Sets the required configurations for performing failover. */ /** Sets the required configurations for performing failover. */
public static void setFailoverConfigurations(MiniDFSCluster cluster, public static void setFailoverConfigurations(MiniDFSCluster cluster,
Configuration conf, String logicalName, int nsIndex) { Configuration conf, String logicalName, int nsIndex) {
setFailoverConfigurations(cluster, conf, logicalName, nsIndex,
ConfiguredFailoverProxyProvider.class);
}
/** Sets the required configurations for performing failover. */
public static <P extends FailoverProxyProvider<?>> void
setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf,
String logicalName, int nsIndex, Class<P> classFPP) {
MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3); List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
for (MiniDFSCluster.NameNodeInfo nn : nns) { for (MiniDFSCluster.NameNodeInfo nn : nns) {
nnAddresses.add(nn.nameNode.getNameNodeAddress()); nnAddresses.add(nn.nameNode.getNameNodeAddress());
} }
setFailoverConfigurations(conf, logicalName, nnAddresses); setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
} }
public static void setFailoverConfigurations(Configuration conf, String logicalName, public static void setFailoverConfigurations(Configuration conf, String logicalName,
InetSocketAddress ... nnAddresses){ InetSocketAddress ... nnAddresses){
setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses)); setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses),
ConfiguredFailoverProxyProvider.class);
} }
/** /**
* Sets the required configurations for performing failover * Sets the required configurations for performing failover
*/ */
public static void setFailoverConfigurations(Configuration conf, public static <P extends FailoverProxyProvider<?>> void
String logicalName, List<InetSocketAddress> nnAddresses) { setFailoverConfigurations(Configuration conf, String logicalName,
List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
setFailoverConfigurations(conf, logicalName, setFailoverConfigurations(conf, logicalName,
Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() { Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
@ -302,7 +312,7 @@ public static void setFailoverConfigurations(Configuration conf,
public String apply(InetSocketAddress addr) { public String apply(InetSocketAddress addr) {
return "hdfs://" + addr.getHostName() + ":" + addr.getPort(); return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
} }
}), ConfiguredFailoverProxyProvider.class); }), classFPP);
} }
public static <P extends FailoverProxyProvider<?>> public static <P extends FailoverProxyProvider<?>>

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -47,6 +48,7 @@
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.event.Level;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -112,6 +114,50 @@ public void shutdownCluster() throws IOException {
} }
} }
/**
* Test that, when using ObserverReadProxyProvider with DT authentication,
* the ORPP gracefully handles when the Standby NN throws a StandbyException.
*/
@Test(timeout = 300000)
public void testObserverReadProxyProviderWithDT() throws Exception {
// Make the first node standby, so that the ORPP will try it first
// instead of just using and succeeding on the active
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
HATestUtil.setFailoverConfigurations(cluster, conf,
HATestUtil.getLogicalHostname(cluster), 0,
ObserverReadProxyProvider.class);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
dfs = (DistributedFileSystem) FileSystem.get(conf);
final UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("JobTracker");
final Token<DelegationTokenIdentifier> token =
getDelegationToken(dfs, ugi.getShortUserName());
ugi.addToken(token);
// Recreate the DFS, this time authenticating using a DT
dfs = ugi.doAs((PrivilegedExceptionAction<DistributedFileSystem>)
() -> (DistributedFileSystem) FileSystem.get(conf));
GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG);
GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
.captureLogs(ObserverReadProxyProvider.LOG);
try {
dfs.access(new Path("/"), FsAction.READ);
assertTrue(logCapture.getOutput()
.contains("threw StandbyException when fetching HAState"));
HATestUtil.isSentToAnyOfNameNodes(dfs, cluster, 1);
cluster.shutdownNameNode(0);
logCapture.clearOutput();
dfs.access(new Path("/"), FsAction.READ);
assertTrue(logCapture.getOutput().contains("Assuming Standby state"));
} finally {
logCapture.stopCapturing();
}
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testDelegationTokenDFSApi() throws Exception { public void testDelegationTokenDFSApi() throws Exception {
final Token<DelegationTokenIdentifier> token = final Token<DelegationTokenIdentifier> token =