HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma.

This commit is contained in:
Ming Ma 2016-08-30 14:00:13 -07:00
parent c4ee6915a1
commit 9dcbdbdb5a
19 changed files with 1164 additions and 471 deletions

View File

@ -33,6 +33,7 @@ public class DatanodeAdminProperties {
private int port;
private String upgradeDomain;
private AdminStates adminState = AdminStates.NORMAL;
private long maintenanceExpireTimeInMS = Long.MAX_VALUE;
/**
* Return the host name of the datanode.
@ -97,4 +98,22 @@ public class DatanodeAdminProperties {
public void setAdminState(final AdminStates adminState) {
this.adminState = adminState;
}
/**
* Get the maintenance expiration time in milliseconds.
* @return the maintenance expiration time in milliseconds.
*/
public long getMaintenanceExpireTimeInMS() {
return this.maintenanceExpireTimeInMS;
}
/**
* Get the maintenance expiration time in milliseconds.
* @param maintenanceExpireTimeInMS
* the maintenance expiration time in milliseconds.
*/
public void setMaintenanceExpireTimeInMS(
final long maintenanceExpireTimeInMS) {
this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS;
}
}

View File

@ -83,6 +83,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
}
protected AdminStates adminState;
private long maintenanceExpireTimeInMS;
public DatanodeInfo(DatanodeInfo from) {
super(from);
@ -499,17 +500,28 @@ public class DatanodeInfo extends DatanodeID implements Node {
}
/**
* Put a node to maintenance mode.
* Start the maintenance operation.
*/
public void startMaintenance() {
adminState = AdminStates.ENTERING_MAINTENANCE;
this.adminState = AdminStates.ENTERING_MAINTENANCE;
}
/**
* Put a node to maintenance mode.
* Put a node directly to maintenance mode.
*/
public void setInMaintenance() {
adminState = AdminStates.IN_MAINTENANCE;
this.adminState = AdminStates.IN_MAINTENANCE;
}
/**
* @param maintenanceExpireTimeInMS the time that the DataNode is in the
* maintenance mode until in the unit of milliseconds. */
public void setMaintenanceExpireTimeInMS(long maintenanceExpireTimeInMS) {
this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS;
}
public long getMaintenanceExpireTimeInMS() {
return this.maintenanceExpireTimeInMS;
}
/**
@ -519,6 +531,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
adminState = null;
}
public static boolean maintenanceNotExpired(long maintenanceExpireTimeInMS) {
return Time.monotonicNow() < maintenanceExpireTimeInMS;
}
/**
* Returns true if the node is is entering_maintenance
*/
@ -541,6 +556,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
adminState == AdminStates.IN_MAINTENANCE);
}
public boolean maintenanceExpired() {
return !maintenanceNotExpired(this.maintenanceExpireTimeInMS);
}
public boolean isInService() {
return getAdminState() == AdminStates.NORMAL;
}

View File

@ -141,7 +141,7 @@ public final class HdfsConstants {
// type of the datanode report
public enum DatanodeReportType {
ALL, LIVE, DEAD, DECOMMISSIONING
ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE
}
public static final byte RS_6_3_POLICY_ID = 0;

View File

@ -148,6 +148,24 @@ public class CombinedHostFileManager extends HostConfigManager {
};
}
synchronized long getMaintenanceExpireTimeInMS(
final InetSocketAddress address) {
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
allDNs.get(address.getAddress()),
new Predicate<DatanodeAdminProperties>() {
public boolean apply(DatanodeAdminProperties input) {
return input.getAdminState().equals(
AdminStates.IN_MAINTENANCE) &&
(input.getPort() == 0 ||
input.getPort() == address.getPort());
}
});
// if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
// set in the config.
return datanode.iterator().hasNext() ?
datanode.iterator().next().getMaintenanceExpireTimeInMS() : 0;
}
static class HostIterator extends UnmodifiableIterator<InetSocketAddress> {
private final Iterator<Map.Entry<InetAddress,
DatanodeAdminProperties>> it;
@ -236,6 +254,11 @@ public class CombinedHostFileManager extends HostConfigManager {
return hostProperties.getUpgradeDomain(dn.getResolvedAddress());
}
@Override
public long getMaintenanceExpirationTimeInMS(DatanodeID dn) {
return hostProperties.getMaintenanceExpireTimeInMS(dn.getResolvedAddress());
}
/**
* Set the properties lists by the new instances. The
* old instance is discarded.

View File

@ -552,7 +552,7 @@ public class DatanodeManager {
/** Get a datanode descriptor given corresponding DatanodeUUID */
DatanodeDescriptor getDatanode(final String datanodeUuid) {
public DatanodeDescriptor getDatanode(final String datanodeUuid) {
if (datanodeUuid == null) {
return null;
}
@ -902,10 +902,14 @@ public class DatanodeManager {
*
* @param nodeReg datanode
*/
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
void startAdminOperationIfNecessary(DatanodeDescriptor nodeReg) {
long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg);
// If the registered node is in exclude list, then decommission it
if (getHostConfigManager().isExcluded(nodeReg)) {
decomManager.startDecommission(nodeReg);
} else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
}
}
@ -1017,7 +1021,7 @@ public class DatanodeManager {
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
startDecommissioningIfExcluded(nodeS);
startAdminOperationIfNecessary(nodeS);
success = true;
} finally {
if (!success) {
@ -1056,7 +1060,7 @@ public class DatanodeManager {
heartbeatManager.addDatanode(nodeDescr);
heartbeatManager.updateDnStat(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
startDecommissioningIfExcluded(nodeDescr);
startAdminOperationIfNecessary(nodeDescr);
success = true;
} finally {
if (!success) {
@ -1122,9 +1126,14 @@ public class DatanodeManager {
if (!hostConfigManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
} else {
if (hostConfigManager.isExcluded(node)) {
long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(node);
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
} else if (hostConfigManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3.
} else {
decomManager.stopMaintenance(node);
decomManager.stopDecommission(node); // case 4.
}
}
@ -1157,7 +1166,12 @@ public class DatanodeManager {
// A decommissioning DN may be "alive" or "dead".
return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
}
/** @return list of datanodes that are entering maintenance. */
public List<DatanodeDescriptor> getEnteringMaintenanceNodes() {
return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE);
}
/* Getter and Setter for stale DataNodes related attributes */
/**
@ -1342,6 +1356,9 @@ public class DatanodeManager {
final boolean listDecommissioningNodes =
type == DatanodeReportType.ALL ||
type == DatanodeReportType.DECOMMISSIONING;
final boolean listEnteringMaintenanceNodes =
type == DatanodeReportType.ALL ||
type == DatanodeReportType.ENTERING_MAINTENANCE;
ArrayList<DatanodeDescriptor> nodes;
final HostSet foundNodes = new HostSet();
@ -1353,10 +1370,12 @@ public class DatanodeManager {
for (DatanodeDescriptor dn : datanodeMap.values()) {
final boolean isDead = isDatanodeDead(dn);
final boolean isDecommissioning = dn.isDecommissionInProgress();
final boolean isEnteringMaintenance = dn.isEnteringMaintenance();
if (((listLiveNodes && !isDead) ||
(listDeadNodes && isDead) ||
(listDecommissioningNodes && isDecommissioning)) &&
(listDecommissioningNodes && isDecommissioning) ||
(listEnteringMaintenanceNodes && isEnteringMaintenance)) &&
hostConfigManager.isIncluded(dn)) {
nodes.add(dn);
}

View File

@ -47,7 +47,7 @@ class DatanodeStats {
synchronized void add(final DatanodeDescriptor node) {
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
if (node.isInService()) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
nodesInService++;
@ -56,7 +56,8 @@ class DatanodeStats {
capacityRemaining += node.getRemaining();
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
} else if (!node.isDecommissioned()) {
} else if (node.isDecommissionInProgress() ||
node.isEnteringMaintenance()) {
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
}
@ -74,7 +75,7 @@ class DatanodeStats {
synchronized void subtract(final DatanodeDescriptor node) {
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
if (node.isInService()) {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
nodesInService--;
@ -83,7 +84,8 @@ class DatanodeStats {
capacityRemaining -= node.getRemaining();
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
} else if (!node.isDecommissioned()) {
} else if (node.isDecommissionInProgress() ||
node.isEnteringMaintenance()) {
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
}

View File

@ -86,8 +86,11 @@ public class DecommissionManager {
private final ScheduledExecutorService executor;
/**
* Map containing the decommission-in-progress datanodes that are being
* tracked so they can be be marked as decommissioned.
* Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
* datanodes that are being tracked so they can be be marked as
* DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
* IN_MAINTENANCE, the node remains in the map until
* maintenance expires checked during a monitor tick.
* <p/>
* This holds a set of references to the under-replicated blocks on the DN at
* the time the DN is added to the map, i.e. the blocks that are preventing
@ -102,12 +105,12 @@ public class DecommissionManager {
* another check is done with the actual block map.
*/
private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
decomNodeBlocks;
outOfServiceNodeBlocks;
/**
* Tracking a node in decomNodeBlocks consumes additional memory. To limit
* the impact on NN memory consumption, we limit the number of nodes in
* decomNodeBlocks. Additional nodes wait in pendingNodes.
* Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
* limit the impact on NN memory consumption, we limit the number of nodes in
* outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
*/
private final Queue<DatanodeDescriptor> pendingNodes;
@ -122,7 +125,7 @@ public class DecommissionManager {
executor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
.setDaemon(true).build());
decomNodeBlocks = new TreeMap<>();
outOfServiceNodeBlocks = new TreeMap<>();
pendingNodes = new LinkedList<>();
}
@ -222,13 +225,56 @@ public class DecommissionManager {
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
decomNodeBlocks.remove(node);
outOfServiceNodeBlocks.remove(node);
} else {
LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
node, node.getAdminState());
}
}
/**
* Start maintenance of the specified datanode.
* @param node
*/
@VisibleForTesting
public void startMaintenance(DatanodeDescriptor node,
long maintenanceExpireTimeInMS) {
// Even if the node is already in maintenance, we still need to adjust
// the expiration time.
node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
if (!node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startMaintenance(node);
pendingNodes.add(node);
} else {
LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
node, node.getAdminState());
}
}
/**
* Stop maintenance of the specified datanode.
* @param node
*/
@VisibleForTesting
public void stopMaintenance(DatanodeDescriptor node) {
if (node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopMaintenance(node);
// TODO HDFS-9390 remove replicas from block maps
// or handle over replicated blocks.
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
outOfServiceNodeBlocks.remove(node);
} else {
LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
node, node.getAdminState());
}
}
private void setDecommissioned(DatanodeDescriptor dn) {
dn.setDecommissioned();
LOG.info("Decommissioning complete for node {}", dn);
@ -313,7 +359,7 @@ public class DecommissionManager {
@VisibleForTesting
public int getNumTrackedNodes() {
return decomNodeBlocks.size();
return outOfServiceNodeBlocks.size();
}
@VisibleForTesting
@ -333,8 +379,8 @@ public class DecommissionManager {
*/
private final int numBlocksPerCheck;
/**
* The maximum number of nodes to track in decomNodeBlocks. A value of 0
* means no limit.
* The maximum number of nodes to track in outOfServiceNodeBlocks.
* A value of 0 means no limit.
*/
private final int maxConcurrentTrackedNodes;
/**
@ -347,7 +393,7 @@ public class DecommissionManager {
*/
private int numNodesChecked = 0;
/**
* The last datanode in decomNodeBlocks that we've processed
* The last datanode in outOfServiceNodeBlocks that we've processed
*/
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
DatanodeID("", "", "", 0, 0, 0, 0));
@ -393,14 +439,15 @@ public class DecommissionManager {
private void processPendingNodes() {
while (!pendingNodes.isEmpty() &&
(maxConcurrentTrackedNodes == 0 ||
decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
decomNodeBlocks.put(pendingNodes.poll(), null);
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
}
}
private void check() {
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
while (it.hasNext() && !exceededNumBlocksPerCheck()) {
@ -410,6 +457,17 @@ public class DecommissionManager {
final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
if (dn.isMaintenance()) {
// TODO HDFS-9390 make sure blocks are minimally replicated
// before transitioning the node to IN_MAINTENANCE state.
// If maintenance expires, stop tracking it.
if (dn.maintenanceExpired()) {
stopMaintenance(dn);
toRemove.add(dn);
}
continue;
}
if (blocks == null) {
// This is a newly added datanode, run through its list to schedule
// under-replicated blocks for replication and collect the blocks
@ -417,7 +475,7 @@ public class DecommissionManager {
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
outOfServiceNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// This is a known datanode, check if its # of insufficiently
@ -436,7 +494,7 @@ public class DecommissionManager {
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
outOfServiceNodeBlocks.put(dn, blocks);
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as decommissioned.
@ -460,11 +518,12 @@ public class DecommissionManager {
}
iterkey = dn;
}
// Remove the datanodes that are decommissioned
// Remove the datanodes that are decommissioned or in service after
// maintenance expiration.
for (DatanodeDescriptor dn : toRemove) {
Preconditions.checkState(dn.isDecommissioned(),
"Removing a node that is not yet decommissioned!");
decomNodeBlocks.remove(dn);
Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
"Removing a node that is not yet decommissioned or in service!");
outOfServiceNodeBlocks.remove(dn);
}
}

View File

@ -265,6 +265,33 @@ class HeartbeatManager implements DatanodeStatistics {
}
}
synchronized void startMaintenance(final DatanodeDescriptor node) {
if (!node.isAlive()) {
LOG.info("Dead node {} is put in maintenance state immediately.", node);
node.setInMaintenance();
} else if (node.isDecommissioned()) {
LOG.info("Decommissioned node " + node + " is put in maintenance state"
+ " immediately.");
node.setInMaintenance();
} else {
stats.subtract(node);
node.startMaintenance();
stats.add(node);
}
}
synchronized void stopMaintenance(final DatanodeDescriptor node) {
LOG.info("Stopping maintenance of {} node {}",
node.isAlive() ? "live" : "dead", node);
if (!node.isAlive()) {
node.stopMaintenance();
} else {
stats.subtract(node);
node.stopMaintenance();
stats.add(node);
}
}
synchronized void stopDecommission(final DatanodeDescriptor node) {
LOG.info("Stopping decommissioning of {} node {}",
node.isAlive() ? "live" : "dead", node);

View File

@ -77,4 +77,11 @@ public abstract class HostConfigManager implements Configurable {
* @return the upgrade domain of dn.
*/
public abstract String getUpgradeDomain(DatanodeID dn);
/**
* Get the maintenance expiration time in milli seconds.
* @param dn the DatanodeID of the datanode
* @return the maintenance expiration time of dn.
*/
public abstract long getMaintenanceExpirationTimeInMS(DatanodeID dn);
}

View File

@ -138,6 +138,12 @@ public class HostFileManager extends HostConfigManager {
return null;
}
@Override
public long getMaintenanceExpirationTimeInMS(DatanodeID dn) {
// The include/exclude files based config doesn't support maintenance mode.
return 0;
}
/**
* Read the includes and excludes lists from the named files. Any previous
* includes and excludes lists are discarded.

View File

@ -7079,5 +7079,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return blockManager.getBytesInFuture();
}
@Override // FSNamesystemMBean
public int getNumInMaintenanceLiveDataNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
int liveInMaintenance = 0;
for (DatanodeDescriptor node : live) {
liveInMaintenance += node.isInMaintenance() ? 1 : 0;
}
return liveInMaintenance;
}
@Override // FSNamesystemMBean
public int getNumInMaintenanceDeadDataNodes() {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
int deadInMaintenance = 0;
for (DatanodeDescriptor node : dead) {
deadInMaintenance += node.isInMaintenance() ? 1 : 0;
}
return deadInMaintenance;
}
@Override // FSNamesystemMBean
public int getNumEnteringMaintenanceDataNodes() {
return getBlockManager().getDatanodeManager().getEnteringMaintenanceNodes()
.size();
}
}

View File

@ -208,4 +208,19 @@ public interface FSNamesystemMBean {
* Return total time spent doing sync operations on FSEditLog.
*/
String getTotalSyncTimes();
/**
* @return Number of IN_MAINTENANCE live data nodes
*/
int getNumInMaintenanceLiveDataNodes();
/**
* @return Number of IN_MAINTENANCE dead data nodes
*/
int getNumInMaintenanceDeadDataNodes();
/**
* @return Number of ENTERING_MAINTENANCE data nodes
*/
int getNumEnteringMaintenanceDataNodes();
}

View File

@ -0,0 +1,375 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.After;
import org.junit.Before;
/**
* This class provide utilities for testing of the admin operations of nodes.
*/
public class AdminStatesBaseTest {
public static final Log LOG = LogFactory.getLog(AdminStatesBaseTest.class);
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
final private Random myrand = new Random();
private HostsFileWriter hostsFileWriter;
private Configuration conf;
private MiniDFSCluster cluster = null;
private boolean useCombinedHostFileManager = false;
protected void setUseCombinedHostFileManager() {
useCombinedHostFileManager = true;
}
protected Configuration getConf() {
return conf;
}
protected MiniDFSCluster getCluster() {
return cluster;
}
@Before
public void setup() throws IOException {
// Set up the hosts/exclude files.
hostsFileWriter = new HostsFileWriter();
conf = new HdfsConfiguration();
if (useCombinedHostFileManager) {
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
}
// Setup conf
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
200);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
NAMENODE_REPLICATION_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
hostsFileWriter.initialize(conf, "temp/admin");
}
@After
public void teardown() throws IOException {
hostsFileWriter.cleanup();
shutdownCluster();
}
protected void writeFile(FileSystem fileSys, Path name, int repl)
throws IOException {
writeFile(fileSys, name, repl, 2);
}
protected void writeFile(FileSystem fileSys, Path name, int repl,
int numOfBlocks) throws IOException {
writeFile(fileSys, name, repl, numOfBlocks, true);
}
protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
int repl, int numOfBlocks, boolean completeFile)
throws IOException {
// create and write a file that contains two blocks of data
FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) repl, blockSize);
byte[] buffer = new byte[blockSize*numOfBlocks];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
LOG.info("Created file " + name + " with " + repl + " replicas.");
if (completeFile) {
stm.close();
return null;
} else {
// Do not close stream, return it
// so that it is not garbage collected
return stm;
}
}
/*
* decommission the DN or put the DN into maintenance for datanodeUuid or one
* random node if datanodeUuid is null.
* And wait for the node to reach the given {@code waitForState}.
*/
protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS,
ArrayList<DatanodeInfo> decommissionedNodes,
AdminStates waitForState) throws IOException {
return takeNodeOutofService(nnIndex, datanodeUuid,
maintenanceExpirationInMS, decommissionedNodes, null, waitForState);
}
/*
* decommission the DN or put the DN to maintenance set by datanodeUuid
* Pick randome node if datanodeUuid == null
* wait for the node to reach the given {@code waitForState}.
*/
protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS,
List<DatanodeInfo> decommissionedNodes,
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
throws IOException {
DFSClient client = getDfsClient(nnIndex);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
boolean isDecommissionRequest =
waitForState == AdminStates.DECOMMISSION_INPROGRESS ||
waitForState == AdminStates.DECOMMISSIONED;
//
// pick one datanode randomly unless the caller specifies one.
//
int index = 0;
if (datanodeUuid == null) {
boolean found = false;
while (!found) {
index = myrand.nextInt(info.length);
if ((isDecommissionRequest && !info[index].isDecommissioned()) ||
(!isDecommissionRequest && !info[index].isInMaintenance())) {
found = true;
}
}
} else {
// The caller specifies a DN
for (; index < info.length; index++) {
if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
break;
}
}
if (index == info.length) {
throw new IOException("invalid datanodeUuid " + datanodeUuid);
}
}
String nodename = info[index].getXferAddr();
LOG.info("Taking node: " + nodename + " out of service");
ArrayList<String> decommissionNodes = new ArrayList<String>();
if (decommissionedNodes != null) {
for (DatanodeInfo dn : decommissionedNodes) {
decommissionNodes.add(dn.getName());
}
}
Map<String, Long> maintenanceNodes = new HashMap<>();
if (inMaintenanceNodes != null) {
for (Map.Entry<DatanodeInfo, Long> dn :
inMaintenanceNodes.entrySet()) {
maintenanceNodes.put(dn.getKey().getName(), dn.getValue());
}
}
if (isDecommissionRequest) {
decommissionNodes.add(nodename);
} else {
maintenanceNodes.put(nodename, maintenanceExpirationInMS);
}
// write node names into the json host file.
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
refreshNodes(nnIndex);
DatanodeInfo ret = NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]);
waitNodeState(ret, waitForState);
return ret;
}
/* Ask a specific NN to put the datanode in service and wait for it
* to reach the NORMAL state.
*/
protected void putNodeInService(int nnIndex,
DatanodeInfo outOfServiceNode) throws IOException {
LOG.info("Putting node: " + outOfServiceNode + " in service");
ArrayList<String> decommissionNodes = new ArrayList<>();
Map<String, Long> maintenanceNodes = new HashMap<>();
DatanodeManager dm =
cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager();
List<DatanodeDescriptor> nodes =
dm.getDatanodeListForReport(DatanodeReportType.ALL);
for (DatanodeDescriptor node : nodes) {
if (node.isMaintenance()) {
maintenanceNodes.put(node.getName(),
node.getMaintenanceExpireTimeInMS());
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
decommissionNodes.add(node.getName());
}
}
decommissionNodes.remove(outOfServiceNode.getName());
maintenanceNodes.remove(outOfServiceNode.getName());
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
refreshNodes(nnIndex);
waitNodeState(outOfServiceNode, AdminStates.NORMAL);
}
protected void putNodeInService(int nnIndex,
String datanodeUuid) throws IOException {
DatanodeInfo datanodeInfo =
getDatanodeDesriptor(cluster.getNamesystem(nnIndex), datanodeUuid);
putNodeInService(nnIndex, datanodeInfo);
}
/*
* Wait till node is transitioned to the expected state.
*/
protected void waitNodeState(DatanodeInfo node,
AdminStates state) {
boolean done = state == node.getAdminState();
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(HEARTBEAT_INTERVAL * 500);
} catch (InterruptedException e) {
// nothing
}
done = state == node.getAdminState();
}
LOG.info("node " + node + " reached the state " + state);
}
protected void initIncludeHost(String hostNameAndPort) throws IOException {
hostsFileWriter.initIncludeHost(hostNameAndPort);
}
protected void initIncludeHosts(String[] hostNameAndPorts)
throws IOException {
hostsFileWriter.initIncludeHosts(hostNameAndPorts);
}
protected void initExcludeHost(String hostNameAndPort) throws IOException {
hostsFileWriter.initExcludeHost(hostNameAndPort);
}
protected void initExcludeHosts(List<String> hostNameAndPorts)
throws IOException {
hostsFileWriter.initExcludeHosts(hostNameAndPorts);
}
/* Get DFSClient to the namenode */
protected DFSClient getDfsClient(final int nnIndex) throws IOException {
return new DFSClient(cluster.getNameNode(nnIndex).getNameNodeAddress(),
conf);
}
/* Validate cluster has expected number of datanodes */
protected static void validateCluster(DFSClient client, int numDNs)
throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
assertEquals("Number of Datanodes ", numDNs, info.length);
}
/** Start a MiniDFSCluster.
* @throws IOException */
protected void startCluster(int numNameNodes, int numDatanodes,
boolean setupHostsFile, long[] nodesCapacity,
boolean checkDataNodeHostConfig) throws IOException {
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
.numDataNodes(numDatanodes);
if (setupHostsFile) {
builder.setupHostsFile(setupHostsFile);
}
if (nodesCapacity != null) {
builder.simulatedCapacities(nodesCapacity);
}
if (checkDataNodeHostConfig) {
builder.checkDataNodeHostConfig(checkDataNodeHostConfig);
}
cluster = builder.build();
cluster.waitActive();
for (int i = 0; i < numNameNodes; i++) {
DFSClient client = getDfsClient(i);
validateCluster(client, numDatanodes);
}
}
protected void startCluster(int numNameNodes, int numDatanodes)
throws IOException {
startCluster(numNameNodes, numDatanodes, false, null, false);
}
protected void startSimpleHACluster(int numDatanodes) throws IOException {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(
numDatanodes).build();
cluster.transitionToActive(0);
cluster.waitActive();
}
protected void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}
}
protected void refreshNodes(final int nnIndex) throws IOException {
cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager().
refreshNodes(conf);
}
protected DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}
protected void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
fileSys.delete(name, true);
assertTrue(!fileSys.exists(name));
}
}

View File

@ -0,0 +1,310 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
* This class tests node maintenance.
*/
public class TestMaintenanceState extends AdminStatesBaseTest {
public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class);
static private final long EXPIRATION_IN_MS = 500;
public TestMaintenanceState() {
setUseCombinedHostFileManager();
}
/**
* Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to
* AdminStates.NORMAL.
*/
@Test(timeout = 360000)
public void testTakeNodeOutOfEnteringMaintenance() throws Exception {
LOG.info("Starting testTakeNodeOutOfEnteringMaintenance");
final int replicas = 1;
final int numNamenodes = 1;
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE);
putNodeInService(0, nodeOutofService.getDatanodeUuid());
cleanupFile(fileSys, file1);
}
/**
* Verify a AdminStates.ENTERING_MAINTENANCE node can expire and transition
* to AdminStates.NORMAL upon timeout.
*/
@Test(timeout = 360000)
public void testEnteringMaintenanceExpiration() throws Exception {
LOG.info("Starting testEnteringMaintenanceExpiration");
final int replicas = 1;
final int numNamenodes = 1;
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1);
// expires in 500 milliseconds
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
Time.monotonicNow() + EXPIRATION_IN_MS, null,
AdminStates.ENTERING_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL);
cleanupFile(fileSys, file1);
}
/**
* Verify node stays in AdminStates.NORMAL with invalid expiration.
*/
@Test(timeout = 360000)
public void testInvalidExpiration() throws Exception {
LOG.info("Starting testInvalidExpiration");
final int replicas = 1;
final int numNamenodes = 1;
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1);
// expiration has to be greater than Time.monotonicNow().
takeNodeOutofService(0, null, Time.monotonicNow(), null,
AdminStates.NORMAL);
cleanupFile(fileSys, file1);
}
/**
* When a dead node is put to maintenance, it transitions directly to
* AdminStates.IN_MAINTENANCE.
*/
@Test(timeout = 360000)
public void testPutDeadNodeToMaintenance() throws Exception {
LOG.info("Starting testPutDeadNodeToMaintenance");
final int numNamenodes = 1;
final int numDatanodes = 1;
final int replicas = 1;
final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1);
MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0);
DFSTestUtil.waitForDatanodeState(
getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000);
int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes();
int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes();
takeNodeOutofService(0, dnProp.datanode.getDatanodeUuid(), Long.MAX_VALUE,
null, AdminStates.IN_MAINTENANCE);
assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes());
assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
cleanupFile(fileSys, file1);
}
/**
* When a dead node is put to maintenance, it transitions directly to
* AdminStates.IN_MAINTENANCE. Then AdminStates.IN_MAINTENANCE expires and
* transitions to AdminStates.NORMAL.
*/
@Test(timeout = 360000)
public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception {
LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration");
final int numNamenodes = 1;
final int numDatanodes = 1;
final int replicas = 1;
final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1);
MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0);
DFSTestUtil.waitForDatanodeState(
getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000);
int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes();
int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes();
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
dnProp.datanode.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null,
AdminStates.IN_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL);
// no change
assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes());
assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
cleanupFile(fileSys, file1);
}
/**
* Transition from decommissioned state to maintenance state.
*/
@Test(timeout = 360000)
public void testTransitionFromDecommissioned() throws IOException {
LOG.info("Starting testTransitionFromDecommissioned");
final int numNamenodes = 1;
final int numDatanodes = 4;
final int replicas = 3;
final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null,
AdminStates.DECOMMISSIONED);
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE,
null, AdminStates.IN_MAINTENANCE);
cleanupFile(fileSys, file1);
}
/**
* Transition from decommissioned state to maintenance state.
* After the maintenance state expires, it is transitioned to NORMAL.
*/
@Test(timeout = 360000)
public void testTransitionFromDecommissionedAndExpired() throws IOException {
LOG.info("Starting testTransitionFromDecommissionedAndExpired");
final int numNamenodes = 1;
final int numDatanodes = 4;
final int replicas = 3;
final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null,
AdminStates.DECOMMISSIONED);
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null,
AdminStates.IN_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL);
cleanupFile(fileSys, file1);
}
/**
* When a node is put to maintenance, it first transitions to
* AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal
* replication before it can be transitioned to AdminStates.IN_MAINTENANCE.
* If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin
* state should stay in AdminStates.ENTERING_MAINTENANCE state.
*/
@Test(timeout = 360000)
public void testNodeDeadWhenInEnteringMaintenance() throws Exception {
LOG.info("Starting testNodeDeadWhenInEnteringMaintenance");
final int numNamenodes = 1;
final int numDatanodes = 1;
final int replicas = 1;
final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null,
AdminStates.ENTERING_MAINTENANCE);
assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
MiniDFSCluster.DataNodeProperties dnProp =
getCluster().stopDataNode(nodeOutofService.getXferAddr());
DFSTestUtil.waitForDatanodeState(
getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
DFSClient client = getDfsClient(0);
assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
client.datanodeReport(DatanodeReportType.LIVE).length);
getCluster().restartDataNode(dnProp, true);
getCluster().waitActive();
waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE);
assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
cleanupFile(fileSys, file1);
}
static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
Path name) throws IOException {
// need a raw stream
assertTrue("Not HDFS:"+fileSys.getUri(),
fileSys instanceof DistributedFileSystem);
HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
for (LocatedBlock blk : dinfo) { // for each block
DatanodeInfo[] nodes = blk.getLocations();
if (nodes.length > 0) {
return nodes[0].getDatanodeUuid();
}
}
return null;
}
}

View File

@ -158,7 +158,7 @@ public class TestDecommissioningStatus {
// write nodename into the exclude file.
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
nodes.add(dnName);
hostsFileWriter.initExcludeHosts(nodes.toArray(new String[0]));
hostsFileWriter.initExcludeHosts(nodes);
}
private void checkDecommissionStatus(DatanodeDescriptor decommNode,

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.util;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@ -73,30 +76,60 @@ public class HostsFileWriter {
}
public void initExcludeHost(String hostNameAndPort) throws IOException {
initExcludeHosts(hostNameAndPort);
ArrayList<String> nodes = new ArrayList<>();
nodes.add(hostNameAndPort);
initExcludeHosts(nodes);
}
public void initExcludeHosts(String... hostNameAndPorts) throws IOException {
public void initExcludeHosts(List<String> hostNameAndPorts)
throws IOException {
initOutOfServiceHosts(hostNameAndPorts, null);
}
public void initOutOfServiceHosts(List<String> decommissionHostNameAndPorts,
Map<String, Long> maintenanceHosts) throws IOException {
StringBuilder excludeHosts = new StringBuilder();
if (isLegacyHostsFile) {
for (String hostNameAndPort : hostNameAndPorts) {
if (maintenanceHosts != null && maintenanceHosts.size() > 0) {
throw new UnsupportedOperationException(
"maintenance support isn't supported by legacy hosts file");
}
for (String hostNameAndPort : decommissionHostNameAndPorts) {
excludeHosts.append(hostNameAndPort).append("\n");
}
DFSTestUtil.writeFile(localFileSys, excludeFile, excludeHosts.toString());
DFSTestUtil.writeFile(localFileSys, excludeFile,
excludeHosts.toString());
} else {
HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
for (String hostNameAndPort : hostNameAndPorts) {
DatanodeAdminProperties dn = new DatanodeAdminProperties();
String[] hostAndPort = hostNameAndPort.split(":");
dn.setHostName(hostAndPort[0]);
dn.setPort(Integer.parseInt(hostAndPort[1]));
dn.setAdminState(AdminStates.DECOMMISSIONED);
allDNs.add(dn);
if (decommissionHostNameAndPorts != null) {
for (String hostNameAndPort : decommissionHostNameAndPorts) {
DatanodeAdminProperties dn = new DatanodeAdminProperties();
String[] hostAndPort = hostNameAndPort.split(":");
dn.setHostName(hostAndPort[0]);
dn.setPort(Integer.parseInt(hostAndPort[1]));
dn.setAdminState(AdminStates.DECOMMISSIONED);
allDNs.add(dn);
}
}
if (maintenanceHosts != null) {
for (Map.Entry<String, Long> hostEntry : maintenanceHosts.entrySet()) {
DatanodeAdminProperties dn = new DatanodeAdminProperties();
String[] hostAndPort = hostEntry.getKey().split(":");
dn.setHostName(hostAndPort[0]);
dn.setPort(Integer.parseInt(hostAndPort[1]));
dn.setAdminState(AdminStates.IN_MAINTENANCE);
dn.setMaintenanceExpireTimeInMS(hostEntry.getValue());
allDNs.add(dn);
}
}
CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs);
}
}
public void initIncludeHost(String hostNameAndPort) throws IOException {
initIncludeHosts(new String[]{hostNameAndPort});
}
public void initIncludeHosts(String[] hostNameAndPorts) throws IOException {
StringBuilder includeHosts = new StringBuilder();
if (isLegacyHostsFile) {

View File

@ -62,7 +62,7 @@ public class TestCombinedHostsFileReader {
public void testLoadExistingJsonFile() throws Exception {
Set<DatanodeAdminProperties> all =
CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath());
assertEquals(5, all.size());
assertEquals(7, all.size());
}
/*

View File

@ -3,3 +3,5 @@
{"hostName": "host3", "adminState": "DECOMMISSIONED"}
{"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"}
{"hostName": "host5", "port": 8090}
{"hostName": "host6", "adminState": "IN_MAINTENANCE"}
{"hostName": "host7", "adminState": "IN_MAINTENANCE", "maintenanceExpireTimeInMS": "112233"}