HDFS-16303. Improve handling of datanode lost while decommissioning (#3920)

Co-authored-by: Kevin Wikant <wikak@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
KevinWikant 2022-01-31 02:12:27 -05:00 committed by GitHub
parent b46ecb7330
commit 543ff58127
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 390 additions and 10 deletions

View File

@ -21,17 +21,19 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.AbstractList;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -95,6 +97,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DatanodeAdminManager {
private static final Logger LOG =
LoggerFactory.getLogger(DatanodeAdminManager.class);
/**
* Sort by lastUpdate time descending order, such that unhealthy
* nodes are de-prioritized given they cannot be decommissioned.
*/
static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
(dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager hbManager;
@ -127,7 +137,7 @@ public class DatanodeAdminManager {
* limit the impact on NN memory consumption, we limit the number of nodes in
* outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
*/
private final Queue<DatanodeDescriptor> pendingNodes;
private final PriorityQueue<DatanodeDescriptor> pendingNodes;
private Monitor monitor = null;
DatanodeAdminManager(final Namesystem namesystem,
@ -140,7 +150,7 @@ public class DatanodeAdminManager {
new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
.setDaemon(true).build());
outOfServiceNodeBlocks = new TreeMap<>();
pendingNodes = new ArrayDeque<>();
pendingNodes = new PriorityQueue<>(PENDING_NODES_QUEUE_COMPARATOR);
}
/**
@ -520,7 +530,7 @@ public class DatanodeAdminManager {
}
/**
* Pop datanodes off the pending list and into decomNodeBlocks,
* Pop datanodes off the pending priority queue and into decomNodeBlocks,
* subject to the maxConcurrentTrackedNodes limit.
*/
private void processPendingNodes() {
@ -536,6 +546,7 @@ public class DatanodeAdminManager {
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final List<DatanodeDescriptor> toRemove = new ArrayList<>();
final List<DatanodeDescriptor> unhealthyDns = new ArrayList<>();
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) {
@ -572,6 +583,10 @@ public class DatanodeAdminManager {
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
}
final boolean isHealthy = blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (!isHealthy) {
unhealthyDns.add(dn);
}
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
@ -587,8 +602,6 @@ public class DatanodeAdminManager {
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
setDecommissioned(dn);
@ -627,6 +640,25 @@ public class DatanodeAdminManager {
iterkey = dn;
}
}
// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numTrackedNodes = outOfServiceNodeBlocks.size() - toRemove.size();
int numQueuedNodes = getPendingNodes().size();
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);
// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dNode -> {
pendingNodes.add(dNode);
outOfServiceNodeBlocks.remove(dNode);
});
}
// Remove the datanodes that are DECOMMISSIONED or in service after
// maintenance expiration.
for (DatanodeDescriptor dn : toRemove) {
@ -794,6 +826,36 @@ public class DatanodeAdminManager {
lowRedundancyOpenFiles, lowRedundancyBlocks,
outOfServiceOnlyReplicas);
}
/**
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
* until it becomes healthy again. If there are more pendingNodes than can be tracked
* & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes
* to avoid blocking decommissioning of healthy nodes.
*
* @param unhealthyDns The unhealthy datanodes which may be re-queued
* @param numDecommissioningNodes The total number of nodes being decommissioned
* @return Stream of unhealthy nodes to be re-queued
*/
private Stream<DatanodeDescriptor> getUnhealthyNodesToRequeue(
final List<DatanodeDescriptor> unhealthyDns, int numDecommissioningNodes) {
if (!unhealthyDns.isEmpty()) {
// Compute the number of unhealthy nodes to re-queue
final int numUnhealthyNodesToRequeue =
Math.min(numDecommissioningNodes - maxConcurrentTrackedNodes, unhealthyDns.size());
LOG.warn("{} limit has been reached, re-queueing {} "
+ "nodes which are dead while in Decommission In Progress.",
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
numUnhealthyNodesToRequeue);
// Order unhealthy nodes by lastUpdate descending such that nodes
// which have been unhealthy the longest are preferred to be re-queued
return unhealthyDns.stream().sorted(PENDING_NODES_QUEUE_COMPARATOR.reversed())
.limit(numUnhealthyNodesToRequeue);
}
return Stream.empty();
}
}
@VisibleForTesting

View File

@ -449,7 +449,7 @@ public class AdminStatesBaseTest {
refreshNodes(conf);
}
static private DatanodeDescriptor getDatanodeDesriptor(
static DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -26,6 +27,8 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -39,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.fs.BlockLocation;
@ -1230,7 +1235,7 @@ public class TestDecommission extends AdminStatesBaseTest {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
3);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE);
startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem();
@ -1283,7 +1288,7 @@ public class TestDecommission extends AdminStatesBaseTest {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE);
startCluster(1, 2);
final DatanodeManager datanodeManager =
@ -1332,7 +1337,7 @@ public class TestDecommission extends AdminStatesBaseTest {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE);
startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem();
@ -1585,4 +1590,215 @@ public class TestDecommission extends AdminStatesBaseTest {
cleanupFile(fileSys, file);
}
/**
* Test DatanodeAdminManager logic to re-queue unhealthy decommissioning nodes
* which are blocking the decommissioning of healthy nodes.
* Force the tracked nodes set to be filled with nodes lost while decommissioning,
* then decommission healthy nodes & validate they are decommissioned eventually.
*/
@Test(timeout = 120000)
public void testRequeueUnhealthyDecommissioningNodes() throws Exception {
// Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and
// 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file
// with replication factor of 5.
final int numLiveNodes = 3;
final int numDeadNodes = 2;
final int numNodes = numLiveNodes + numDeadNodes;
final List<DatanodeDescriptor> liveNodes = new ArrayList<>();
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps =
new HashMap<>();
final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
final Path filePath = new Path("/tmp/test");
createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, numDeadNodes,
deadNodeProps, decommissionedNodes, filePath);
final FSNamesystem namesystem = getCluster().getNamesystem();
final BlockManager blockManager = namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
// after several seconds of operation
final Duration checkDuration = Duration.ofSeconds(5);
Instant checkUntil = Instant.now().plus(checkDuration);
while (Instant.now().isBefore(checkUntil)) {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
0, decomManager.getNumPendingNodes());
assertEquals(
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
numDeadNodes, decomManager.getNumTrackedNodes());
assertTrue(
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
Thread.sleep(500);
}
// Delete the file such that its no longer a factor blocking decommissioning of live nodes
// which have block replicas for that file
getCluster().getFileSystem().delete(filePath, true);
// Start decommissioning 2 "live" datanodes
int numLiveDecommNodes = 2;
final List<DatanodeDescriptor> liveDecommNodes = liveNodes.subList(0, numLiveDecommNodes);
for (final DatanodeDescriptor liveNode : liveDecommNodes) {
takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(liveNode);
}
// Write a new file such that there are under-replicated blocks preventing decommissioning
// of dead nodes
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
// Validate that the live datanodes are put into the pending decommissioning queue
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == numDeadNodes
&& decomManager.getNumPendingNodes() == numLiveDecommNodes
&& liveDecommNodes.stream().allMatch(
node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)),
500, 30000);
assertThat(liveDecommNodes)
.as("Check all live decommissioning nodes queued in DatanodeAdminManager")
.containsAll(decomManager.getPendingNodes());
// Run DatanodeAdminManager.Monitor, then validate the dead nodes are re-queued & the
// live nodes are decommissioned.For TestDecommission a single tick/execution of the
// DatanodeAdminDefaultMonitor will re-queue the dead nodes. A seconds tick is needed
// to de-queue the live nodes & decommission them.
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
4, decomManager.getNumPendingNodes());
assertEquals(
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
0, decomManager.getNumTrackedNodes());
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
2, decomManager.getNumPendingNodes());
assertEquals(
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
0, decomManager.getNumTrackedNodes());
assertTrue("Live nodes not DECOMMISSIONED as expected.", liveDecommNodes.stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED)));
assertTrue("Dead nodes not DECOMMISSION_INPROGRESS as expected.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
assertThat(deadNodeProps.keySet())
.as("Check all dead decommissioning nodes queued in DatanodeAdminManager")
.containsAll(decomManager.getPendingNodes());
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
// after several seconds of operation
checkUntil = Instant.now().plus(checkDuration);
while (Instant.now().isBefore(checkUntil)) {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
0, decomManager.getNumPendingNodes());
assertEquals(
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
numDeadNodes, decomManager.getNumTrackedNodes());
assertTrue(
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
Thread.sleep(500);
}
// Delete the file such that there are no more under-replicated blocks
// allowing the dead nodes to be decommissioned
getCluster().getFileSystem().delete(filePath, true);
// Validate the dead nodes are eventually decommissioned
GenericTestUtils.waitFor(() -> {
try {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception running DatanodeAdminMonitor", e);
return false;
}
return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
&& deadNodeProps.keySet().stream().allMatch(
node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED));
}, 500, 30000);
}
/**
* Create a MiniDFSCluster with "numLiveNodes" live datanodes in AdminState=NORMAL and
* "numDeadNodes" dead datanodes in AdminState=DECOMMISSION_INPROGRESS. Create a file
* replicated to all datanodes.
*
* @param numLiveNodes - number of live nodes in cluster
* @param liveNodes - list which will be loaded with references to 3 live datanodes
* @param numDeadNodes - number of live nodes in cluster
* @param deadNodeProps - map which will be loaded with references to 2 dead datanodes
* @param decommissionedNodes - list which will be loaded with references to decommissioning nodes
* @param filePath - path used to create HDFS file
*/
private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveNodes,
final List<DatanodeDescriptor> liveNodes, final int numDeadNodes,
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps,
final ArrayList<DatanodeInfo> decommissionedNodes, final Path filePath) throws Exception {
assertTrue("Must have numLiveNode > 0", numLiveNodes > 0);
assertTrue("Must have numDeadNode > 0", numDeadNodes > 0);
int numNodes = numLiveNodes + numDeadNodes;
// Allow "numDeadNodes" datanodes to be decommissioned at a time
getConf()
.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, numDeadNodes);
// Disable the normal monitor runs
getConf()
.setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE);
// Start cluster with "numNodes" datanodes
startCluster(1, numNodes);
final FSNamesystem namesystem = getCluster().getNamesystem();
final BlockManager blockManager = namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
assertEquals(numNodes, getCluster().getDataNodes().size());
getCluster().waitActive();
// "numLiveNodes" datanodes will remain "live"
for (final DataNode node : getCluster().getDataNodes().subList(0, numLiveNodes)) {
liveNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
}
assertEquals(numLiveNodes, liveNodes.size());
// "numDeadNodes" datanodes will be "dead" while decommissioning
final List<DatanodeDescriptor> deadNodes =
getCluster().getDataNodes().subList(numLiveNodes, numNodes).stream()
.map(dn -> getDatanodeDesriptor(namesystem, dn.getDatanodeUuid()))
.collect(Collectors.toList());
assertEquals(numDeadNodes, deadNodes.size());
// Create file with block replicas on all nodes
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
// Cause the "dead" nodes to be lost while in state decommissioning
// and fill the tracked nodes set with those "dead" nodes
for (final DatanodeDescriptor deadNode : deadNodes) {
// Start decommissioning the node, it will not be able to complete due to the
// under-replicated file
takeNodeOutofService(0, deadNode.getDatanodeUuid(), 0, decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(deadNode);
// Stop the datanode so that it is lost while decommissioning
MiniDFSCluster.DataNodeProperties dn = getCluster().stopDataNode(deadNode.getXferAddr());
deadNodeProps.put(deadNode, dn);
deadNode.setLastUpdate(213); // Set last heartbeat to be in the past
}
assertEquals(numDeadNodes, deadNodeProps.size());
// Wait for the decommissioning nodes to become dead & to be added to "pendingNodes"
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
&& decomManager.getNumPendingNodes() == numDeadNodes
&& deadNodes.stream().allMatch(node ->
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
&& !node.isAlive()), 500, 20000);
}
}

View File

@ -375,6 +375,16 @@ public class BlockManagerTestUtil {
dm.getDatanodeAdminManager().runMonitorForTest();
}
/**
* Have BlockManager check isNodeHealthyForDecommissionOrMaintenance for a given datanode.
* @param blockManager the BlockManager to check against
* @param dn the datanode to check
*/
public static boolean isNodeHealthyForDecommissionOrMaintenance(BlockManager blockManager,
DatanodeDescriptor dn) {
return blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
}
/**
* add block to the replicateBlocks queue of the Datanode
*/

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
public class TestDatanodeAdminManager {
public static final Logger LOG = LoggerFactory.getLogger(TestDatanodeAdminManager.class);
// Sort by lastUpdate time descending order, such that unhealthy
// nodes are de-prioritized given they cannot be decommissioned.
private static final int NUM_DATANODE = 10;
private static final int[] UNORDERED_LAST_UPDATE_TIMES =
new int[] {0, 5, 2, 11, 0, 3, 1001, 5, 1, 103};
private static final int[] ORDERED_LAST_UPDATE_TIMES =
new int[] {1001, 103, 11, 5, 5, 3, 2, 1, 0, 0};
private static final int[] REVERSE_ORDER_LAST_UPDATE_TIMES =
new int[] {0, 0, 1, 2, 3, 5, 5, 11, 103, 1001};
private static final DatanodeDescriptor[] NODES;
static {
NODES = new DatanodeDescriptor[NUM_DATANODE];
for (int i = 0; i < NUM_DATANODE; i++) {
NODES[i] = new DatanodeDescriptor(DatanodeID.EMPTY_DATANODE_ID);
NODES[i].setLastUpdate(UNORDERED_LAST_UPDATE_TIMES[i]);
NODES[i].setLastUpdateMonotonic(UNORDERED_LAST_UPDATE_TIMES[i]);
}
}
/**
* Verify that DatanodeAdminManager pendingNodes priority queue
* correctly orders the nodes by lastUpdate time descending.
*/
@Test
public void testPendingNodesQueueOrdering() {
final PriorityQueue<DatanodeDescriptor> pendingNodes =
new PriorityQueue<>(DatanodeAdminManager.PENDING_NODES_QUEUE_COMPARATOR);
pendingNodes.addAll(Arrays.asList(NODES));
for (int i = 0; i < NUM_DATANODE; i++) {
final DatanodeDescriptor dn = pendingNodes.poll();
Assert.assertNotNull(dn);
Assert.assertEquals(ORDERED_LAST_UPDATE_TIMES[i], dn.getLastUpdate());
}
}
/**
* Verify that DatanodeAdminManager logic to sort unhealthy nodes
* correctly orders the nodes by lastUpdate time ascending.
*/
@Test
public void testPendingNodesQueueReverseOrdering() {
final List<DatanodeDescriptor> nodes = Arrays.asList(NODES);
final List<DatanodeDescriptor> reverseOrderNodes =
nodes.stream().sorted(DatanodeAdminManager.PENDING_NODES_QUEUE_COMPARATOR.reversed())
.collect(Collectors.toList());
Assert.assertEquals(NUM_DATANODE, reverseOrderNodes.size());
for (int i = 0; i < NUM_DATANODE; i++) {
Assert.assertEquals(REVERSE_ORDER_LAST_UPDATE_TIMES[i],
reverseOrderNodes.get(i).getLastUpdate());
}
}
}