HDFS-16303. Improve handling of datanode lost while decommissioning (#3921)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
110104da38
commit
5e2eac6c41
|
@ -34,6 +34,7 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class implements the logic to track decommissioning and entering
|
* This class implements the logic to track decommissioning and entering
|
||||||
|
@ -149,7 +150,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void stopTrackingNode(DatanodeDescriptor dn) {
|
public void stopTrackingNode(DatanodeDescriptor dn) {
|
||||||
pendingNodes.remove(dn);
|
getPendingNodes().remove(dn);
|
||||||
cancelledNodes.add(dn);
|
cancelledNodes.add(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +190,29 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
|
||||||
* node will be removed from tracking by the pending cancel.
|
* node will be removed from tracking by the pending cancel.
|
||||||
*/
|
*/
|
||||||
processCancelledNodes();
|
processCancelledNodes();
|
||||||
|
|
||||||
|
// Having more nodes decommissioning than can be tracked will impact decommissioning
|
||||||
|
// performance due to queueing delay
|
||||||
|
int numTrackedNodes = outOfServiceNodeBlocks.size();
|
||||||
|
int numQueuedNodes = getPendingNodes().size();
|
||||||
|
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
|
||||||
|
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
|
||||||
|
LOG.warn(
|
||||||
|
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
|
||||||
|
+ "{} nodes are currently queued waiting to be decommissioned.",
|
||||||
|
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);
|
||||||
|
|
||||||
|
// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
|
||||||
|
final List<DatanodeDescriptor> unhealthyDns = outOfServiceNodeBlocks.keySet().stream()
|
||||||
|
.filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
|
||||||
|
getPendingNodes().add(dn);
|
||||||
|
outOfServiceNodeBlocks.remove(dn);
|
||||||
|
pendingRep.remove(dn);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
processPendingNodes();
|
processPendingNodes();
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
|
@ -207,7 +231,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
|
||||||
LOG.info("Checked {} blocks this tick. {} nodes are now " +
|
LOG.info("Checked {} blocks this tick. {} nodes are now " +
|
||||||
"in maintenance or transitioning state. {} nodes pending. {} " +
|
"in maintenance or transitioning state. {} nodes pending. {} " +
|
||||||
"nodes waiting to be cancelled.",
|
"nodes waiting to be cancelled.",
|
||||||
numBlocksChecked, outOfServiceNodeBlocks.size(), pendingNodes.size(),
|
numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
|
||||||
cancelledNodes.size());
|
cancelledNodes.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -220,10 +244,10 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
|
||||||
* the pendingNodes list from being modified externally.
|
* the pendingNodes list from being modified externally.
|
||||||
*/
|
*/
|
||||||
private void processPendingNodes() {
|
private void processPendingNodes() {
|
||||||
while (!pendingNodes.isEmpty() &&
|
while (!getPendingNodes().isEmpty() &&
|
||||||
(maxConcurrentTrackedNodes == 0 ||
|
(maxConcurrentTrackedNodes == 0 ||
|
||||||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
|
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
|
||||||
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
|
outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopTrackingNode(DatanodeDescriptor dn) {
|
public void stopTrackingNode(DatanodeDescriptor dn) {
|
||||||
pendingNodes.remove(dn);
|
getPendingNodes().remove(dn);
|
||||||
outOfServiceNodeBlocks.remove(dn);
|
outOfServiceNodeBlocks.remove(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,19 +164,19 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
|
LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
|
||||||
"in maintenance or transitioning state. {} nodes pending.",
|
"in maintenance or transitioning state. {} nodes pending.",
|
||||||
numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(),
|
numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(),
|
||||||
pendingNodes.size());
|
getPendingNodes().size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pop datanodes off the pending list and into decomNodeBlocks,
|
* Pop datanodes off the pending priority queue and into decomNodeBlocks,
|
||||||
* subject to the maxConcurrentTrackedNodes limit.
|
* subject to the maxConcurrentTrackedNodes limit.
|
||||||
*/
|
*/
|
||||||
private void processPendingNodes() {
|
private void processPendingNodes() {
|
||||||
while (!pendingNodes.isEmpty() &&
|
while (!getPendingNodes().isEmpty() &&
|
||||||
(maxConcurrentTrackedNodes == 0 ||
|
(maxConcurrentTrackedNodes == 0 ||
|
||||||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
|
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
|
||||||
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
|
outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +185,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
it = new CyclicIteration<>(outOfServiceNodeBlocks,
|
it = new CyclicIteration<>(outOfServiceNodeBlocks,
|
||||||
iterkey).iterator();
|
iterkey).iterator();
|
||||||
final List<DatanodeDescriptor> toRemove = new ArrayList<>();
|
final List<DatanodeDescriptor> toRemove = new ArrayList<>();
|
||||||
|
final List<DatanodeDescriptor> unhealthyDns = new ArrayList<>();
|
||||||
|
|
||||||
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
|
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
|
||||||
.isRunning()) {
|
.isRunning()) {
|
||||||
|
@ -221,6 +222,10 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
|
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
|
||||||
pruneReliableBlocks(dn, blocks);
|
pruneReliableBlocks(dn, blocks);
|
||||||
}
|
}
|
||||||
|
final boolean isHealthy = blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
|
||||||
|
if (!isHealthy) {
|
||||||
|
unhealthyDns.add(dn);
|
||||||
|
}
|
||||||
if (blocks.size() == 0) {
|
if (blocks.size() == 0) {
|
||||||
if (!fullScan) {
|
if (!fullScan) {
|
||||||
// If we didn't just do a full scan, need to re-check with the
|
// If we didn't just do a full scan, need to re-check with the
|
||||||
|
@ -236,8 +241,6 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
}
|
}
|
||||||
// If the full scan is clean AND the node liveness is okay,
|
// If the full scan is clean AND the node liveness is okay,
|
||||||
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
|
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
|
||||||
final boolean isHealthy =
|
|
||||||
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
|
|
||||||
if (blocks.size() == 0 && isHealthy) {
|
if (blocks.size() == 0 && isHealthy) {
|
||||||
if (dn.isDecommissionInProgress()) {
|
if (dn.isDecommissionInProgress()) {
|
||||||
dnAdmin.setDecommissioned(dn);
|
dnAdmin.setDecommissioned(dn);
|
||||||
|
@ -270,12 +273,31 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
|
||||||
// an invalid state.
|
// an invalid state.
|
||||||
LOG.warn("DatanodeAdminMonitor caught exception when processing node "
|
LOG.warn("DatanodeAdminMonitor caught exception when processing node "
|
||||||
+ "{}.", dn, e);
|
+ "{}.", dn, e);
|
||||||
pendingNodes.add(dn);
|
getPendingNodes().add(dn);
|
||||||
toRemove.add(dn);
|
toRemove.add(dn);
|
||||||
} finally {
|
} finally {
|
||||||
iterkey = dn;
|
iterkey = dn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Having more nodes decommissioning than can be tracked will impact decommissioning
|
||||||
|
// performance due to queueing delay
|
||||||
|
int numTrackedNodes = outOfServiceNodeBlocks.size() - toRemove.size();
|
||||||
|
int numQueuedNodes = getPendingNodes().size();
|
||||||
|
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
|
||||||
|
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
|
||||||
|
LOG.warn(
|
||||||
|
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
|
||||||
|
+ "{} nodes are currently queued waiting to be decommissioned.",
|
||||||
|
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);
|
||||||
|
|
||||||
|
// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
|
||||||
|
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
|
||||||
|
getPendingNodes().add(dn);
|
||||||
|
outOfServiceNodeBlocks.remove(dn);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Remove the datanodes that are DECOMMISSIONED or in service after
|
// Remove the datanodes that are DECOMMISSIONED or in service after
|
||||||
// maintenance expiration.
|
// maintenance expiration.
|
||||||
for (DatanodeDescriptor dn : toRemove) {
|
for (DatanodeDescriptor dn : toRemove) {
|
||||||
|
|
|
@ -24,8 +24,11 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This abstract class provides some base methods which are inherited by
|
* This abstract class provides some base methods which are inherited by
|
||||||
|
@ -35,12 +38,20 @@ import java.util.Queue;
|
||||||
public abstract class DatanodeAdminMonitorBase
|
public abstract class DatanodeAdminMonitorBase
|
||||||
implements DatanodeAdminMonitorInterface, Configurable {
|
implements DatanodeAdminMonitorInterface, Configurable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort by lastUpdate time descending order, such that unhealthy
|
||||||
|
* nodes are de-prioritized given they cannot be decommissioned.
|
||||||
|
*/
|
||||||
|
static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
|
||||||
|
(dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());
|
||||||
|
|
||||||
protected BlockManager blockManager;
|
protected BlockManager blockManager;
|
||||||
protected Namesystem namesystem;
|
protected Namesystem namesystem;
|
||||||
protected DatanodeAdminManager dnAdmin;
|
protected DatanodeAdminManager dnAdmin;
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
|
|
||||||
protected final Queue<DatanodeDescriptor> pendingNodes = new ArrayDeque<>();
|
private final PriorityQueue<DatanodeDescriptor> pendingNodes = new PriorityQueue<>(
|
||||||
|
PENDING_NODES_QUEUE_COMPARATOR);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of nodes to track in outOfServiceNodeBlocks.
|
* The maximum number of nodes to track in outOfServiceNodeBlocks.
|
||||||
|
@ -151,4 +162,34 @@ public abstract class DatanodeAdminMonitorBase
|
||||||
public Queue<DatanodeDescriptor> getPendingNodes() {
|
public Queue<DatanodeDescriptor> getPendingNodes() {
|
||||||
return pendingNodes;
|
return pendingNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
|
||||||
|
* until it becomes healthy again. If there are more pendingNodes than can be tracked
|
||||||
|
* & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes
|
||||||
|
* to avoid blocking decommissioning of healthy nodes.
|
||||||
|
*
|
||||||
|
* @param unhealthyDns The unhealthy datanodes which may be re-queued
|
||||||
|
* @param numDecommissioningNodes The total number of nodes being decommissioned
|
||||||
|
* @return Stream of unhealthy nodes to be re-queued
|
||||||
|
*/
|
||||||
|
Stream<DatanodeDescriptor> getUnhealthyNodesToRequeue(
|
||||||
|
final List<DatanodeDescriptor> unhealthyDns, int numDecommissioningNodes) {
|
||||||
|
if (!unhealthyDns.isEmpty()) {
|
||||||
|
// Compute the number of unhealthy nodes to re-queue
|
||||||
|
final int numUnhealthyNodesToRequeue =
|
||||||
|
Math.min(numDecommissioningNodes - maxConcurrentTrackedNodes, unhealthyDns.size());
|
||||||
|
|
||||||
|
LOG.warn("{} limit has been reached, re-queueing {} "
|
||||||
|
+ "nodes which are dead while in Decommission In Progress.",
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
||||||
|
numUnhealthyNodesToRequeue);
|
||||||
|
|
||||||
|
// Order unhealthy nodes by lastUpdate descending such that nodes
|
||||||
|
// which have been unhealthy the longest are preferred to be re-queued
|
||||||
|
return unhealthyDns.stream().sorted(PENDING_NODES_QUEUE_COMPARATOR.reversed())
|
||||||
|
.limit(numUnhealthyNodesToRequeue);
|
||||||
|
}
|
||||||
|
return Stream.empty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -449,7 +449,7 @@ public class AdminStatesBaseTest {
|
||||||
refreshNodes(conf);
|
refreshNodes(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static private DatanodeDescriptor getDatanodeDesriptor(
|
static DatanodeDescriptor getDatanodeDesriptor(
|
||||||
final FSNamesystem ns, final String datanodeUuid) {
|
final FSNamesystem ns, final String datanodeUuid) {
|
||||||
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
|
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -26,6 +27,8 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -40,6 +43,8 @@ import java.util.regex.Pattern;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.commons.text.TextStringBuilder;
|
import org.apache.commons.text.TextStringBuilder;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
@ -1297,7 +1302,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
|
||||||
3);
|
3);
|
||||||
// Disable the normal monitor runs
|
// Disable the normal monitor runs
|
||||||
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
|
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
|
||||||
Integer.MAX_VALUE);
|
Integer.MAX_VALUE);
|
||||||
startCluster(1, 3);
|
startCluster(1, 3);
|
||||||
final FileSystem fs = getCluster().getFileSystem();
|
final FileSystem fs = getCluster().getFileSystem();
|
||||||
|
@ -1350,7 +1355,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
||||||
1);
|
1);
|
||||||
// Disable the normal monitor runs
|
// Disable the normal monitor runs
|
||||||
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
|
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
|
||||||
Integer.MAX_VALUE);
|
Integer.MAX_VALUE);
|
||||||
startCluster(1, 2);
|
startCluster(1, 2);
|
||||||
final DatanodeManager datanodeManager =
|
final DatanodeManager datanodeManager =
|
||||||
|
@ -1399,7 +1404,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
||||||
1);
|
1);
|
||||||
// Disable the normal monitor runs
|
// Disable the normal monitor runs
|
||||||
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
|
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
|
||||||
Integer.MAX_VALUE);
|
Integer.MAX_VALUE);
|
||||||
startCluster(1, 3);
|
startCluster(1, 3);
|
||||||
final FileSystem fs = getCluster().getFileSystem();
|
final FileSystem fs = getCluster().getFileSystem();
|
||||||
|
@ -1652,4 +1657,229 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
|
|
||||||
cleanupFile(fileSys, file);
|
cleanupFile(fileSys, file);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test DatanodeAdminManager logic to re-queue unhealthy decommissioning nodes
|
||||||
|
* which are blocking the decommissioning of healthy nodes.
|
||||||
|
* Force the tracked nodes set to be filled with nodes lost while decommissioning,
|
||||||
|
* then decommission healthy nodes & validate they are decommissioned eventually.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testRequeueUnhealthyDecommissioningNodes() throws Exception {
|
||||||
|
// Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and
|
||||||
|
// 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file
|
||||||
|
// with replication factor of 5.
|
||||||
|
final int numLiveNodes = 3;
|
||||||
|
final int numDeadNodes = 2;
|
||||||
|
final int numNodes = numLiveNodes + numDeadNodes;
|
||||||
|
final List<DatanodeDescriptor> liveNodes = new ArrayList<>();
|
||||||
|
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps =
|
||||||
|
new HashMap<>();
|
||||||
|
final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
|
||||||
|
final Path filePath = new Path("/tmp/test");
|
||||||
|
createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, numDeadNodes,
|
||||||
|
deadNodeProps, decommissionedNodes, filePath);
|
||||||
|
final FSNamesystem namesystem = getCluster().getNamesystem();
|
||||||
|
final BlockManager blockManager = namesystem.getBlockManager();
|
||||||
|
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
|
||||||
|
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
|
||||||
|
|
||||||
|
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
|
||||||
|
// after several seconds of operation
|
||||||
|
final Duration checkDuration = Duration.ofSeconds(5);
|
||||||
|
Instant checkUntil = Instant.now().plus(checkDuration);
|
||||||
|
while (Instant.now().isBefore(checkUntil)) {
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
assertEquals(
|
||||||
|
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
|
||||||
|
0, decomManager.getNumPendingNodes());
|
||||||
|
assertEquals(
|
||||||
|
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
|
||||||
|
numDeadNodes, decomManager.getNumTrackedNodes());
|
||||||
|
assertTrue(
|
||||||
|
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
|
||||||
|
deadNodeProps.keySet().stream()
|
||||||
|
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the file such that its no longer a factor blocking decommissioning of live nodes
|
||||||
|
// which have block replicas for that file
|
||||||
|
getCluster().getFileSystem().delete(filePath, true);
|
||||||
|
|
||||||
|
// Start decommissioning 2 "live" datanodes
|
||||||
|
int numLiveDecommNodes = 2;
|
||||||
|
final List<DatanodeDescriptor> liveDecommNodes = liveNodes.subList(0, numLiveDecommNodes);
|
||||||
|
for (final DatanodeDescriptor liveNode : liveDecommNodes) {
|
||||||
|
takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, decommissionedNodes,
|
||||||
|
AdminStates.DECOMMISSION_INPROGRESS);
|
||||||
|
decommissionedNodes.add(liveNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write a new file such that there are under-replicated blocks preventing decommissioning
|
||||||
|
// of dead nodes
|
||||||
|
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
|
||||||
|
|
||||||
|
// Validate that the live datanodes are put into the pending decommissioning queue
|
||||||
|
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == numDeadNodes
|
||||||
|
&& decomManager.getNumPendingNodes() == numLiveDecommNodes
|
||||||
|
&& liveDecommNodes.stream().allMatch(
|
||||||
|
node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)),
|
||||||
|
500, 30000);
|
||||||
|
assertThat(liveDecommNodes)
|
||||||
|
.as("Check all live decommissioning nodes queued in DatanodeAdminManager")
|
||||||
|
.containsAll(decomManager.getPendingNodes());
|
||||||
|
|
||||||
|
// Run DatanodeAdminManager.Monitor, then validate the dead nodes are re-queued & the
|
||||||
|
// live nodes are decommissioned
|
||||||
|
if (this instanceof TestDecommissionWithBackoffMonitor) {
|
||||||
|
// For TestDecommissionWithBackoffMonitor a single tick/execution of the
|
||||||
|
// DatanodeAdminBackoffMonitor will re-queue the dead nodes, then call
|
||||||
|
// "processPendingNodes" to de-queue the live nodes & decommission them
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.",
|
||||||
|
2, decomManager.getNumPendingNodes());
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.",
|
||||||
|
0, decomManager.getNumTrackedNodes());
|
||||||
|
} else {
|
||||||
|
// For TestDecommission a single tick/execution of the DatanodeAdminDefaultMonitor
|
||||||
|
// will re-queue the dead nodes. A seconds tick is needed to de-queue the live nodes
|
||||||
|
// & decommission them
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
|
||||||
|
4, decomManager.getNumPendingNodes());
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
|
||||||
|
0, decomManager.getNumTrackedNodes());
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
|
||||||
|
2, decomManager.getNumPendingNodes());
|
||||||
|
assertEquals(
|
||||||
|
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
|
||||||
|
0, decomManager.getNumTrackedNodes());
|
||||||
|
}
|
||||||
|
assertTrue("Live nodes not DECOMMISSIONED as expected.", liveDecommNodes.stream()
|
||||||
|
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED)));
|
||||||
|
assertTrue("Dead nodes not DECOMMISSION_INPROGRESS as expected.",
|
||||||
|
deadNodeProps.keySet().stream()
|
||||||
|
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
|
||||||
|
assertThat(deadNodeProps.keySet())
|
||||||
|
.as("Check all dead decommissioning nodes queued in DatanodeAdminManager")
|
||||||
|
.containsAll(decomManager.getPendingNodes());
|
||||||
|
|
||||||
|
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
|
||||||
|
// after several seconds of operation
|
||||||
|
checkUntil = Instant.now().plus(checkDuration);
|
||||||
|
while (Instant.now().isBefore(checkUntil)) {
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
assertEquals(
|
||||||
|
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
|
||||||
|
0, decomManager.getNumPendingNodes());
|
||||||
|
assertEquals(
|
||||||
|
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
|
||||||
|
numDeadNodes, decomManager.getNumTrackedNodes());
|
||||||
|
assertTrue(
|
||||||
|
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
|
||||||
|
deadNodeProps.keySet().stream()
|
||||||
|
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the file such that there are no more under-replicated blocks
|
||||||
|
// allowing the dead nodes to be decommissioned
|
||||||
|
getCluster().getFileSystem().delete(filePath, true);
|
||||||
|
|
||||||
|
// Validate the dead nodes are eventually decommissioned
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
LOG.warn("Exception running DatanodeAdminMonitor", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
|
||||||
|
&& deadNodeProps.keySet().stream().allMatch(
|
||||||
|
node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED));
|
||||||
|
}, 500, 30000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a MiniDFSCluster with "numLiveNodes" live datanodes in AdminState=NORMAL and
|
||||||
|
* "numDeadNodes" dead datanodes in AdminState=DECOMMISSION_INPROGRESS. Create a file
|
||||||
|
* replicated to all datanodes.
|
||||||
|
*
|
||||||
|
* @param numLiveNodes - number of live nodes in cluster
|
||||||
|
* @param liveNodes - list which will be loaded with references to 3 live datanodes
|
||||||
|
* @param numDeadNodes - number of live nodes in cluster
|
||||||
|
* @param deadNodeProps - map which will be loaded with references to 2 dead datanodes
|
||||||
|
* @param decommissionedNodes - list which will be loaded with references to decommissioning nodes
|
||||||
|
* @param filePath - path used to create HDFS file
|
||||||
|
*/
|
||||||
|
private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveNodes,
|
||||||
|
final List<DatanodeDescriptor> liveNodes, final int numDeadNodes,
|
||||||
|
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps,
|
||||||
|
final ArrayList<DatanodeInfo> decommissionedNodes, final Path filePath) throws Exception {
|
||||||
|
assertTrue("Must have numLiveNode > 0", numLiveNodes > 0);
|
||||||
|
assertTrue("Must have numDeadNode > 0", numDeadNodes > 0);
|
||||||
|
int numNodes = numLiveNodes + numDeadNodes;
|
||||||
|
|
||||||
|
// Allow "numDeadNodes" datanodes to be decommissioned at a time
|
||||||
|
getConf()
|
||||||
|
.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, numDeadNodes);
|
||||||
|
// Disable the normal monitor runs
|
||||||
|
getConf()
|
||||||
|
.setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
// Start cluster with "numNodes" datanodes
|
||||||
|
startCluster(1, numNodes);
|
||||||
|
final FSNamesystem namesystem = getCluster().getNamesystem();
|
||||||
|
final BlockManager blockManager = namesystem.getBlockManager();
|
||||||
|
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
|
||||||
|
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
|
||||||
|
assertEquals(numNodes, getCluster().getDataNodes().size());
|
||||||
|
getCluster().waitActive();
|
||||||
|
|
||||||
|
// "numLiveNodes" datanodes will remain "live"
|
||||||
|
for (final DataNode node : getCluster().getDataNodes().subList(0, numLiveNodes)) {
|
||||||
|
liveNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
|
||||||
|
}
|
||||||
|
assertEquals(numLiveNodes, liveNodes.size());
|
||||||
|
|
||||||
|
// "numDeadNodes" datanodes will be "dead" while decommissioning
|
||||||
|
final List<DatanodeDescriptor> deadNodes =
|
||||||
|
getCluster().getDataNodes().subList(numLiveNodes, numNodes).stream()
|
||||||
|
.map(dn -> getDatanodeDesriptor(namesystem, dn.getDatanodeUuid()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
assertEquals(numDeadNodes, deadNodes.size());
|
||||||
|
|
||||||
|
// Create file with block replicas on all nodes
|
||||||
|
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
|
||||||
|
|
||||||
|
// Cause the "dead" nodes to be lost while in state decommissioning
|
||||||
|
// and fill the tracked nodes set with those "dead" nodes
|
||||||
|
for (final DatanodeDescriptor deadNode : deadNodes) {
|
||||||
|
// Start decommissioning the node, it will not be able to complete due to the
|
||||||
|
// under-replicated file
|
||||||
|
takeNodeOutofService(0, deadNode.getDatanodeUuid(), 0, decommissionedNodes,
|
||||||
|
AdminStates.DECOMMISSION_INPROGRESS);
|
||||||
|
decommissionedNodes.add(deadNode);
|
||||||
|
|
||||||
|
// Stop the datanode so that it is lost while decommissioning
|
||||||
|
MiniDFSCluster.DataNodeProperties dn = getCluster().stopDataNode(deadNode.getXferAddr());
|
||||||
|
deadNodeProps.put(deadNode, dn);
|
||||||
|
deadNode.setLastUpdate(213); // Set last heartbeat to be in the past
|
||||||
|
}
|
||||||
|
assertEquals(numDeadNodes, deadNodeProps.size());
|
||||||
|
|
||||||
|
// Wait for the decommissioning nodes to become dead & to be added to "pendingNodes"
|
||||||
|
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
|
||||||
|
&& decomManager.getNumPendingNodes() == numDeadNodes
|
||||||
|
&& deadNodes.stream().allMatch(node ->
|
||||||
|
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
|
||||||
|
&& !node.isAlive()), 500, 20000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,6 +383,16 @@ public class BlockManagerTestUtil {
|
||||||
dm.getDatanodeAdminManager().runMonitorForTest();
|
dm.getDatanodeAdminManager().runMonitorForTest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Have BlockManager check isNodeHealthyForDecommissionOrMaintenance for a given datanode.
|
||||||
|
* @param blockManager the BlockManager to check against
|
||||||
|
* @param dn the datanode to check
|
||||||
|
*/
|
||||||
|
public static boolean isNodeHealthyForDecommissionOrMaintenance(BlockManager blockManager,
|
||||||
|
DatanodeDescriptor dn) {
|
||||||
|
return blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add block to the replicateBlocks queue of the Datanode
|
* add block to the replicateBlocks queue of the Datanode
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
|
||||||
|
public class TestDatanodeAdminMonitorBase {
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(TestDatanodeAdminMonitorBase.class);
|
||||||
|
|
||||||
|
// Sort by lastUpdate time descending order, such that unhealthy
|
||||||
|
// nodes are de-prioritized given they cannot be decommissioned.
|
||||||
|
private static final int NUM_DATANODE = 10;
|
||||||
|
private static final int[] UNORDERED_LAST_UPDATE_TIMES =
|
||||||
|
new int[] {0, 5, 2, 11, 0, 3, 1001, 5, 1, 103};
|
||||||
|
private static final int[] ORDERED_LAST_UPDATE_TIMES =
|
||||||
|
new int[] {1001, 103, 11, 5, 5, 3, 2, 1, 0, 0};
|
||||||
|
private static final int[] REVERSE_ORDER_LAST_UPDATE_TIMES =
|
||||||
|
new int[] {0, 0, 1, 2, 3, 5, 5, 11, 103, 1001};
|
||||||
|
|
||||||
|
private static final DatanodeDescriptor[] NODES;
|
||||||
|
|
||||||
|
static {
|
||||||
|
NODES = new DatanodeDescriptor[NUM_DATANODE];
|
||||||
|
for (int i = 0; i < NUM_DATANODE; i++) {
|
||||||
|
NODES[i] = new DatanodeDescriptor(DatanodeID.EMPTY_DATANODE_ID);
|
||||||
|
NODES[i].setLastUpdate(UNORDERED_LAST_UPDATE_TIMES[i]);
|
||||||
|
NODES[i].setLastUpdateMonotonic(UNORDERED_LAST_UPDATE_TIMES[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that DatanodeAdminManager pendingNodes priority queue
|
||||||
|
* correctly orders the nodes by lastUpdate time descending.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPendingNodesQueueOrdering() {
|
||||||
|
final PriorityQueue<DatanodeDescriptor> pendingNodes =
|
||||||
|
new PriorityQueue<>(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR);
|
||||||
|
|
||||||
|
pendingNodes.addAll(Arrays.asList(NODES));
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_DATANODE; i++) {
|
||||||
|
final DatanodeDescriptor dn = pendingNodes.poll();
|
||||||
|
Assert.assertNotNull(dn);
|
||||||
|
Assert.assertEquals(ORDERED_LAST_UPDATE_TIMES[i], dn.getLastUpdate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that DatanodeAdminManager logic to sort unhealthy nodes
|
||||||
|
* correctly orders the nodes by lastUpdate time ascending.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPendingNodesQueueReverseOrdering() {
|
||||||
|
final List<DatanodeDescriptor> nodes = Arrays.asList(NODES);
|
||||||
|
final List<DatanodeDescriptor> reverseOrderNodes =
|
||||||
|
nodes.stream().sorted(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR.reversed())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
Assert.assertEquals(NUM_DATANODE, reverseOrderNodes.size());
|
||||||
|
for (int i = 0; i < NUM_DATANODE; i++) {
|
||||||
|
Assert.assertEquals(REVERSE_ORDER_LAST_UPDATE_TIMES[i],
|
||||||
|
reverseOrderNodes.get(i).getLastUpdate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue