HADOOP-15684. triggerActiveLogRoll stuck on dead name node, when ConnectTimeoutException happens. Contributed by Rong Tang.

(cherry picked from commit 7f9a89e1b54a9712af50ffef70bed7cfb91ed34e)
This commit is contained in:
Inigo Goiri 2018-09-19 12:58:31 -07:00
parent e1af3c9bb7
commit 2f7222a27b
3 changed files with 93 additions and 56 deletions

View File

@ -53,8 +53,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.SecurityUtil;
import static org.apache.hadoop.util.Time.monotonicNow;
@ -382,15 +380,6 @@ public class EditLogTailer {
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
lastRollTriggerTxId = lastLoadedTxnId;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RemoteException) {
IOException ioe = ((RemoteException) cause).unwrapRemoteException();
if (ioe instanceof StandbyException) {
LOG.info("Skipping log roll. Remote node is not in Active state: " +
ioe.getMessage().split("\n")[0]);
return;
}
}
LOG.warn("Unable to trigger a roll of the active NN", e);
} catch (TimeoutException e) {
if (future != null) {
@ -497,7 +486,8 @@ public class EditLogTailer {
* This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just
* blindly goes and tries namenodes.
*/
private abstract class MultipleNameNodeProxy<T> implements Callable<T> {
@VisibleForTesting
abstract class MultipleNameNodeProxy<T> implements Callable<T> {
/**
* Do the actual work to the remote namenode via the {@link #cachedActiveProxy}.
@ -513,19 +503,13 @@ public class EditLogTailer {
try {
T ret = doWork();
return ret;
} catch (RemoteException e) {
Throwable cause = e.unwrapRemoteException(StandbyException.class);
// if its not a standby exception, then we need to re-throw it, something bad has happened
if (cause == e) {
throw e;
} else {
// it is a standby exception, so we try the other NN
LOG.warn("Failed to reach remote node: " + currentNN
+ ", retrying with remaining remote NNs");
cachedActiveProxy = null;
// this NN isn't responding to requests, try the next one
nnLoopCount++;
}
} catch (IOException e) {
LOG.warn("Exception from remote name node " + currentNN
+ ", try next.", e);
// Try next name node if exception happens.
cachedActiveProxy = null;
nnLoopCount++;
}
}
throw new IOException("Cannot find any valid remote NN to service request!");

View File

@ -72,6 +72,23 @@ public class MiniDFSNNTopology {
return topology;
}
/**
* Set up an HA topology with a single HA nameservice.
* @param nnCount of namenodes to use with the nameservice
* @param basePort for IPC and Http ports of namenodes.
*/
public static MiniDFSNNTopology simpleHATopology(int nnCount, int basePort) {
MiniDFSNNTopology.NSConf ns = new MiniDFSNNTopology.NSConf("minidfs-ns");
for (int i = 0; i < nnCount; i++) {
ns.addNN(new MiniDFSNNTopology.NNConf("nn" + i)
.setIpcPort(basePort++)
.setHttpPort(basePort++));
}
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(ns);
return topology;
}
/**
* Set up federated cluster with the given number of nameservices, each
* of which has only a single NameNode.

View File

@ -28,6 +28,7 @@ import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -46,7 +47,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
@ -177,21 +177,7 @@ public class TestEditLogTailer {
MiniDFSCluster cluster = null;
for (int i = 0; i < 5; i++) {
try {
// Have to specify IPC ports so the NNs can talk to each other.
int[] ports = ServerSocketUtil.getPorts(3);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1")
.setIpcPort(ports[0]))
.addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ports[1]))
.addNN(new MiniDFSNNTopology.NNConf("nn3")
.setIpcPort(ports[2])));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster = createMiniDFSCluster(conf, 3);
break;
} catch (BindException e) {
// retry if race on ports given by ServerSocketUtil#getPorts
@ -222,21 +208,9 @@ public class TestEditLogTailer {
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
// Have to specify IPC ports so the NNs can talk to each other.
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1")
.setIpcPort(ServerSocketUtil.getPort(0, 100)))
.addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ServerSocketUtil.getPort(0, 100)))
.addNN(new MiniDFSNNTopology.NNConf("nn3")
.setIpcPort(ServerSocketUtil.getPort(0, 100))));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
MiniDFSCluster cluster = null;
try {
cluster = createMiniDFSCluster(conf, 3);
cluster.transitionToStandby(0);
cluster.transitionToStandby(1);
cluster.transitionToStandby(2);
@ -249,7 +223,9 @@ public class TestEditLogTailer {
cluster.transitionToActive(0);
waitForLogRollInSharedDir(cluster, 3);
} finally {
cluster.shutdown();
if (cluster != null) {
cluster.shutdown();
}
}
}
@ -316,4 +292,64 @@ public class TestEditLogTailer {
cluster.shutdown();
}
}
@Test
public void testRollEditLogIOExceptionForRemoteNN() throws IOException {
Configuration conf = getConf();
// Roll every 1s
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = createMiniDFSCluster(conf, 3);
cluster.transitionToActive(0);
EditLogTailer tailer = Mockito.spy(
cluster.getNamesystem(1).getEditLogTailer());
final AtomicInteger invokedTimes = new AtomicInteger(0);
// It should go on to next name node when IOException happens.
when(tailer.getNameNodeProxy()).thenReturn(
tailer.new MultipleNameNodeProxy<Void>() {
@Override
protected Void doWork() throws IOException {
invokedTimes.getAndIncrement();
throw new IOException("It is an IO Exception.");
}
}
);
tailer.triggerActiveLogRoll();
// MultipleNameNodeProxy uses Round-robin to look for active NN
// to do RollEditLog. If doWork() fails, then IOException throws,
// it continues to try next NN. triggerActiveLogRoll finishes
// either due to success, or using up retries.
// In this test case, there are 2 remote name nodes, default retry is 3.
// For test purpose, doWork() always returns IOException,
// so the total invoked times will be default retry 3 * remote NNs 2 = 6
assertEquals(6, invokedTimes.get());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
int nnCount) throws IOException {
int basePort = 10060 + new Random().nextInt(100) * 2;
// By passing in basePort, name node will have IPC port set,
// which is needed for enabling roll log.
MiniDFSNNTopology topology =
MiniDFSNNTopology.simpleHATopology(nnCount, basePort);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
return cluster;
}
}