HDFS-14435. [SBN Read] Enable ObserverReadProxyProvider to gracefully handle StandbyException when fetching HAServiceState. Contributed by Erik Krogen.
This commit is contained in:
parent
769b5a802b
commit
cc22373606
|
@ -44,6 +44,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RpcInvocationHandler;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -67,7 +68,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@InterfaceStability.Evolving
|
||||
public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||
extends AbstractNNFailoverProxyProvider<T> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
@VisibleForTesting
|
||||
static final Logger LOG = LoggerFactory.getLogger(
|
||||
ObserverReadProxyProvider.class);
|
||||
|
||||
/** Configuration key for {@link #autoMsyncPeriodMs}. */
|
||||
|
@ -251,20 +253,38 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|||
}
|
||||
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
|
||||
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
|
||||
try {
|
||||
HAServiceState state = currentProxy.proxy.getHAServiceState();
|
||||
currentProxy.setCachedState(state);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to connect to {}. Setting cached state to Standby",
|
||||
currentProxy.getAddress(), e);
|
||||
currentProxy.setCachedState(HAServiceState.STANDBY);
|
||||
}
|
||||
currentProxy.setCachedState(getHAServiceState(currentProxy));
|
||||
LOG.debug("Changed current proxy from {} to {}",
|
||||
initial == null ? "none" : initial.proxyInfo,
|
||||
currentProxy.proxyInfo);
|
||||
return currentProxy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the service state from a proxy. If it is unable to be fetched,
|
||||
* assume it is in standby state, but log the exception.
|
||||
*/
|
||||
private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
|
||||
IOException ioe;
|
||||
try {
|
||||
return proxyInfo.proxy.getHAServiceState();
|
||||
} catch (RemoteException re) {
|
||||
// Though a Standby will allow a getHAServiceState call, it won't allow
|
||||
// delegation token lookup, so if DT is used it throws StandbyException
|
||||
if (re.unwrapRemoteException() instanceof StandbyException) {
|
||||
LOG.debug("NameNode {} threw StandbyException when fetching HAState",
|
||||
proxyInfo.getAddress());
|
||||
return HAServiceState.STANDBY;
|
||||
}
|
||||
ioe = re;
|
||||
} catch (IOException e) {
|
||||
ioe = e;
|
||||
}
|
||||
LOG.info("Failed to connect to {}. Assuming Standby state",
|
||||
proxyInfo.getAddress(), ioe);
|
||||
return HAServiceState.STANDBY;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will call {@link ClientProtocol#msync()} on the active NameNode
|
||||
* (via the {@link #failoverProxy}) to initialize the state of this client.
|
||||
|
|
|
@ -265,24 +265,34 @@ public abstract class HATestUtil {
|
|||
/** Sets the required configurations for performing failover. */
|
||||
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||
Configuration conf, String logicalName, int nsIndex) {
|
||||
setFailoverConfigurations(cluster, conf, logicalName, nsIndex,
|
||||
ConfiguredFailoverProxyProvider.class);
|
||||
}
|
||||
|
||||
/** Sets the required configurations for performing failover. */
|
||||
public static <P extends FailoverProxyProvider<?>> void
|
||||
setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf,
|
||||
String logicalName, int nsIndex, Class<P> classFPP) {
|
||||
MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
|
||||
List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
|
||||
for (MiniDFSCluster.NameNodeInfo nn : nns) {
|
||||
nnAddresses.add(nn.nameNode.getNameNodeAddress());
|
||||
}
|
||||
setFailoverConfigurations(conf, logicalName, nnAddresses);
|
||||
setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
|
||||
}
|
||||
|
||||
public static void setFailoverConfigurations(Configuration conf, String logicalName,
|
||||
InetSocketAddress ... nnAddresses){
|
||||
setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses));
|
||||
setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses),
|
||||
ConfiguredFailoverProxyProvider.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the required configurations for performing failover
|
||||
*/
|
||||
public static void setFailoverConfigurations(Configuration conf,
|
||||
String logicalName, List<InetSocketAddress> nnAddresses) {
|
||||
public static <P extends FailoverProxyProvider<?>> void
|
||||
setFailoverConfigurations(Configuration conf, String logicalName,
|
||||
List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
|
||||
setFailoverConfigurations(conf, logicalName,
|
||||
Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
|
||||
|
||||
|
@ -291,7 +301,7 @@ public abstract class HATestUtil {
|
|||
public String apply(InetSocketAddress addr) {
|
||||
return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
|
||||
}
|
||||
}), ConfiguredFailoverProxyProvider.class);
|
||||
}), classFPP);
|
||||
}
|
||||
|
||||
public static <P extends FailoverProxyProvider<?>>
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.fs.AbstractFileSystem;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -47,6 +48,7 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -112,6 +114,50 @@ public class TestDelegationTokensWithHA {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, when using ObserverReadProxyProvider with DT authentication,
|
||||
* the ORPP gracefully handles when the Standby NN throws a StandbyException.
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testObserverReadProxyProviderWithDT() throws Exception {
|
||||
// Make the first node standby, so that the ORPP will try it first
|
||||
// instead of just using and succeeding on the active
|
||||
cluster.transitionToStandby(0);
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf,
|
||||
HATestUtil.getLogicalHostname(cluster), 0,
|
||||
ObserverReadProxyProvider.class);
|
||||
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
|
||||
dfs = (DistributedFileSystem) FileSystem.get(conf);
|
||||
final UserGroupInformation ugi = UserGroupInformation
|
||||
.createRemoteUser("JobTracker");
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(dfs, ugi.getShortUserName());
|
||||
ugi.addToken(token);
|
||||
// Recreate the DFS, this time authenticating using a DT
|
||||
dfs = ugi.doAs((PrivilegedExceptionAction<DistributedFileSystem>)
|
||||
() -> (DistributedFileSystem) FileSystem.get(conf));
|
||||
|
||||
GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG);
|
||||
GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
|
||||
.captureLogs(ObserverReadProxyProvider.LOG);
|
||||
try {
|
||||
dfs.access(new Path("/"), FsAction.READ);
|
||||
assertTrue(logCapture.getOutput()
|
||||
.contains("threw StandbyException when fetching HAState"));
|
||||
HATestUtil.isSentToAnyOfNameNodes(dfs, cluster, 1);
|
||||
|
||||
cluster.shutdownNameNode(0);
|
||||
logCapture.clearOutput();
|
||||
dfs.access(new Path("/"), FsAction.READ);
|
||||
assertTrue(logCapture.getOutput().contains("Assuming Standby state"));
|
||||
} finally {
|
||||
logCapture.stopCapturing();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testDelegationTokenDFSApi() throws Exception {
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
|
|
Loading…
Reference in New Issue