HDFS-12703. Exceptions are fatal to decommissioning monitor. Contributed by He Xiaoqiao.

(cherry picked from commit 3d396786cf6eaab49c1c9b8b2a4652c2e440b9e3)
(cherry picked from commit 950aa74d5f)
This commit is contained in:
Inigo Goiri 2019-07-10 11:04:48 -07:00 committed by Wei-Chiu Chuang
parent 42f10712a5
commit fe40fbbd4d
2 changed files with 136 additions and 67 deletions

View File

@ -485,6 +485,7 @@ public class DatanodeAdminManager {
@Override @Override
public void run() { public void run() {
LOG.debug("DatanodeAdminMonitor is running.");
if (!namesystem.isRunning()) { if (!namesystem.isRunning()) {
LOG.info("Namesystem is not running, skipping " + LOG.info("Namesystem is not running, skipping " +
"decommissioning/maintenance checks."); "decommissioning/maintenance checks.");
@ -499,6 +500,9 @@ public class DatanodeAdminManager {
try { try {
processPendingNodes(); processPendingNodes();
check(); check();
} catch (Exception e) {
LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
e);
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
@ -532,83 +536,96 @@ public class DatanodeAdminManager {
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>> final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
entry = it.next(); entry = it.next();
final DatanodeDescriptor dn = entry.getKey(); final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfo> blocks = entry.getValue(); try {
boolean fullScan = false; AbstractList<BlockInfo> blocks = entry.getValue();
if (dn.isMaintenance() && dn.maintenanceExpired()) { boolean fullScan = false;
// If maintenance expires, stop tracking it. if (dn.isMaintenance() && dn.maintenanceExpired()) {
stopMaintenance(dn); // If maintenance expires, stop tracking it.
toRemove.add(dn); stopMaintenance(dn);
continue; toRemove.add(dn);
} continue;
if (dn.isInMaintenance()) { }
// The dn is IN_MAINTENANCE and the maintenance hasn't expired yet. if (dn.isInMaintenance()) {
continue; // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
} continue;
if (blocks == null) { }
// This is a newly added datanode, run through its list to schedule if (blocks == null) {
// under-replicated blocks for replication and collect the blocks // This is a newly added datanode, run through its list to schedule
// that are insufficiently replicated for further tracking // under-replicated blocks for replication and collect the blocks
LOG.debug("Newly-added node {}, doing full scan to find " + // that are insufficiently replicated for further tracking
"insufficiently-replicated blocks.", dn); LOG.debug("Newly-added node {}, doing full scan to find " +
blocks = handleInsufficientlyStored(dn); "insufficiently-replicated blocks.", dn);
outOfServiceNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can move
// to the next state.
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
}
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
// full block map.
//
// We've replicated all the known insufficiently replicated
// blocks. Re-check with the full block map before finally
// marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyStored(dn); blocks = handleInsufficientlyStored(dn);
outOfServiceNodeBlocks.put(dn, blocks); outOfServiceNodeBlocks.put(dn, blocks);
} fullScan = true;
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
setDecommissioned(dn);
toRemove.add(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
setInMaintenance(dn);
} else {
Preconditions.checkState(false,
"A node is in an invalid state!");
}
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
} else { } else {
LOG.debug("Node {} {} healthy." // This is a known datanode, check if its # of insufficiently
+ " It needs to replicate {} more blocks." // replicated blocks has dropped to zero and if it can move
+ " {} is still in progress.", dn, // to the next state.
isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState()); LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
} }
} else { if (blocks.size() == 0) {
LOG.debug("Node {} still has {} blocks to replicate " if (!fullScan) {
+ "before it is a candidate to finish {}.", // If we didn't just do a full scan, need to re-check with the
dn, blocks.size(), dn.getAdminState()); // full block map.
//
// We've replicated all the known insufficiently replicated
// blocks. Re-check with the full block map before finally
// marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyStored(dn);
outOfServiceNodeBlocks.put(dn, blocks);
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
setDecommissioned(dn);
toRemove.add(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
setInMaintenance(dn);
} else {
Preconditions.checkState(false,
"Node %s is in an invalid state! "
+ "Invalid state: %s %s blocks are on this dn.",
dn, dn.getAdminState(), blocks.size());
}
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
} else {
LOG.debug("Node {} {} healthy."
+ " It needs to replicate {} more blocks."
+ " {} is still in progress.", dn,
isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState());
}
} else {
LOG.debug("Node {} still has {} blocks to replicate "
+ "before it is a candidate to finish {}.",
dn, blocks.size(), dn.getAdminState());
}
} catch (Exception e) {
// Log and postpone to process node when meet exception since it is in
// an invalid state.
LOG.warn("DatanodeAdminMonitor caught exception when processing node "
+ "{}.", dn, e);
pendingNodes.add(dn);
toRemove.add(dn);
} finally {
iterkey = dn;
} }
iterkey = dn;
} }
// Remove the datanodes that are DECOMMISSIONED or in service after // Remove the datanodes that are DECOMMISSIONED or in service after
// maintenance expiration. // maintenance expiration.
for (DatanodeDescriptor dn : toRemove) { for (DatanodeDescriptor dn : toRemove) {
Preconditions.checkState(dn.isDecommissioned() || dn.isInService(), Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
"Removing a node that is not yet decommissioned or in service!"); "Removing node %s that is not yet decommissioned or in service!",
dn);
outOfServiceNodeBlocks.remove(dn); outOfServiceNodeBlocks.remove(dn);
} }
} }

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -35,6 +36,7 @@ import java.util.Map;
import java.util.Scanner; import java.util.Scanner;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -1181,6 +1183,56 @@ public class TestDecommission extends AdminStatesBaseTest {
} }
} }
/**
* Test DatanodeAdminManager#monitor can swallow any exceptions by default.
*/
@Test(timeout=120000)
public void testPendingNodeButDecommissioned() throws Exception {
// Only allow one node to be decom'd at a time
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 2);
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
final DatanodeAdminManager decomManager =
datanodeManager.getDatanodeAdminManager();
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
List<DataNode> dns = getCluster().getDataNodes();
// Try to decommission 2 datanodes
for (int i = 0; i < 2; i++) {
DataNode d = dns.get(i);
DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
assertEquals(2, decomManager.getNumPendingNodes());
// Set one datanode state to Decommissioned after decommission ops.
DatanodeDescriptor dn = datanodeManager.getDatanode(dns.get(0)
.getDatanodeId());
dn.setDecommissioned();
try {
// Trigger DatanodeAdminManager#monitor
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
// Wait for OutOfServiceNodeBlocks to be 0
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0,
500, 30000);
assertTrue(GenericTestUtils.anyThreadMatching(
Pattern.compile("DatanodeAdminMonitor-.*")));
} catch (ExecutionException e) {
GenericTestUtils.assertExceptionContains("in an invalid state!", e);
fail("DatanodeAdminManager#monitor does not swallow exceptions.");
}
}
@Test(timeout=120000) @Test(timeout=120000)
public void testPendingNodes() throws Exception { public void testPendingNodes() throws Exception {
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class) org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)