HDFS-8826. In Balancer, add an option to specify the source node list so that balancer only selects blocks to move from those nodes.
This commit is contained in:
parent
79af15fd37
commit
7245f7fad2
|
@ -80,13 +80,14 @@ public class HostsFileReader {
|
||||||
String[] nodes = line.split("[ \t\n\f\r]+");
|
String[] nodes = line.split("[ \t\n\f\r]+");
|
||||||
if (nodes != null) {
|
if (nodes != null) {
|
||||||
for (int i = 0; i < nodes.length; i++) {
|
for (int i = 0; i < nodes.length; i++) {
|
||||||
if (nodes[i].trim().startsWith("#")) {
|
nodes[i] = nodes[i].trim();
|
||||||
|
if (nodes[i].startsWith("#")) {
|
||||||
// Everything from now on is a comment
|
// Everything from now on is a comment
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!nodes[i].isEmpty()) {
|
if (!nodes[i].isEmpty()) {
|
||||||
LOG.info("Adding " + nodes[i] + " to the list of " + type +
|
LOG.info("Adding a node \"" + nodes[i] + "\" to the list of "
|
||||||
" hosts from " + filename);
|
+ type + " hosts from " + filename);
|
||||||
set.add(nodes[i]);
|
set.add(nodes[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -28,7 +27,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -45,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.net.InetAddresses;
|
import com.google.common.net.InetAddresses;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -379,19 +378,6 @@ public class StringUtils {
|
||||||
return str.trim().split("\\s*,\\s*");
|
return str.trim().split("\\s*,\\s*");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Trims all the strings in a Collection<String> and returns a Set<String>.
|
|
||||||
* @param strings
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static Set<String> getTrimmedStrings(Collection<String> strings) {
|
|
||||||
Set<String> trimmedStrings = new HashSet<String>();
|
|
||||||
for (String string: strings) {
|
|
||||||
trimmedStrings.add(string.trim());
|
|
||||||
}
|
|
||||||
return trimmedStrings;
|
|
||||||
}
|
|
||||||
|
|
||||||
final public static String[] emptyStringArray = {};
|
final public static String[] emptyStringArray = {};
|
||||||
final public static char COMMA = ',';
|
final public static char COMMA = ',';
|
||||||
final public static String COMMA_STR = ",";
|
final public static String COMMA_STR = ",";
|
||||||
|
|
|
@ -463,6 +463,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas)
|
HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas)
|
||||||
|
|
||||||
|
HDFS-8826. In Balancer, add an option to specify the source node list
|
||||||
|
so that balancer only selects blocks to move from those nodes. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.util.HostsFileReader;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
|
@ -189,6 +191,7 @@ public class Balancer {
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
private final NameNodeConnector nnc;
|
private final NameNodeConnector nnc;
|
||||||
private final BalancingPolicy policy;
|
private final BalancingPolicy policy;
|
||||||
|
private final Set<String> sourceNodes;
|
||||||
private final boolean runDuringUpgrade;
|
private final boolean runDuringUpgrade;
|
||||||
private final double threshold;
|
private final double threshold;
|
||||||
private final long maxSizeToMove;
|
private final long maxSizeToMove;
|
||||||
|
@ -261,11 +264,12 @@ public class Balancer {
|
||||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
|
||||||
|
|
||||||
this.nnc = theblockpool;
|
this.nnc = theblockpool;
|
||||||
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
|
this.dispatcher = new Dispatcher(theblockpool, p.includedNodes,
|
||||||
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
|
p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads,
|
||||||
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
|
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
|
||||||
this.threshold = p.threshold;
|
this.threshold = p.threshold;
|
||||||
this.policy = p.policy;
|
this.policy = p.policy;
|
||||||
|
this.sourceNodes = p.sourceNodes;
|
||||||
this.runDuringUpgrade = p.runDuringUpgrade;
|
this.runDuringUpgrade = p.runDuringUpgrade;
|
||||||
|
|
||||||
this.maxSizeToMove = getLong(conf,
|
this.maxSizeToMove = getLong(conf,
|
||||||
|
@ -319,14 +323,23 @@ public class Balancer {
|
||||||
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
||||||
for(DatanodeStorageReport r : reports) {
|
for(DatanodeStorageReport r : reports) {
|
||||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||||
|
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
|
||||||
for(StorageType t : StorageType.getMovableTypes()) {
|
for(StorageType t : StorageType.getMovableTypes()) {
|
||||||
final Double utilization = policy.getUtilization(r, t);
|
final Double utilization = policy.getUtilization(r, t);
|
||||||
if (utilization == null) { // datanode does not have such storage type
|
if (utilization == null) { // datanode does not have such storage type
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final double average = policy.getAvgUtilization(t);
|
||||||
|
if (utilization >= average && !isSource) {
|
||||||
|
LOG.info(dn + "[" + t + "] has utilization=" + utilization
|
||||||
|
+ " >= average=" + average
|
||||||
|
+ " but it is not specified as a source; skipping it.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final double utilizationDiff = utilization - average;
|
||||||
final long capacity = getCapacity(r, t);
|
final long capacity = getCapacity(r, t);
|
||||||
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
|
|
||||||
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
|
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
|
||||||
final long maxSize2Move = computeMaxSize2Move(capacity,
|
final long maxSize2Move = computeMaxSize2Move(capacity,
|
||||||
getRemaining(r, t), utilizationDiff, maxSizeToMove);
|
getRemaining(r, t), utilizationDiff, maxSizeToMove);
|
||||||
|
@ -624,6 +637,9 @@ public class Balancer {
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
|
||||||
LOG.info("namenodes = " + namenodes);
|
LOG.info("namenodes = " + namenodes);
|
||||||
LOG.info("parameters = " + p);
|
LOG.info("parameters = " + p);
|
||||||
|
LOG.info("included nodes = " + p.includedNodes);
|
||||||
|
LOG.info("excluded nodes = " + p.excludedNodes);
|
||||||
|
LOG.info("source nodes = " + p.sourceNodes);
|
||||||
|
|
||||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
||||||
|
|
||||||
|
@ -687,29 +703,35 @@ public class Balancer {
|
||||||
static final Parameters DEFAULT = new Parameters(
|
static final Parameters DEFAULT = new Parameters(
|
||||||
BalancingPolicy.Node.INSTANCE, 10.0,
|
BalancingPolicy.Node.INSTANCE, 10.0,
|
||||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
||||||
Collections.<String> emptySet(), Collections.<String> emptySet(),
|
Collections.<String>emptySet(), Collections.<String>emptySet(),
|
||||||
|
Collections.<String>emptySet(),
|
||||||
false);
|
false);
|
||||||
|
|
||||||
final BalancingPolicy policy;
|
final BalancingPolicy policy;
|
||||||
final double threshold;
|
final double threshold;
|
||||||
final int maxIdleIteration;
|
final int maxIdleIteration;
|
||||||
// exclude the nodes in this set from balancing operations
|
/** Exclude the nodes in this set. */
|
||||||
Set<String> nodesToBeExcluded;
|
final Set<String> excludedNodes;
|
||||||
//include only these nodes in balancing operations
|
/** If empty, include any node; otherwise, include only these nodes. */
|
||||||
Set<String> nodesToBeIncluded;
|
final Set<String> includedNodes;
|
||||||
|
/** If empty, any node can be a source;
|
||||||
|
* otherwise, use only these nodes as source nodes.
|
||||||
|
*/
|
||||||
|
final Set<String> sourceNodes;
|
||||||
/**
|
/**
|
||||||
* Whether to run the balancer during upgrade.
|
* Whether to run the balancer during upgrade.
|
||||||
*/
|
*/
|
||||||
final boolean runDuringUpgrade;
|
final boolean runDuringUpgrade;
|
||||||
|
|
||||||
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
|
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
|
||||||
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
|
Set<String> excludedNodes, Set<String> includedNodes,
|
||||||
boolean runDuringUpgrade) {
|
Set<String> sourceNodes, boolean runDuringUpgrade) {
|
||||||
this.policy = policy;
|
this.policy = policy;
|
||||||
this.threshold = threshold;
|
this.threshold = threshold;
|
||||||
this.maxIdleIteration = maxIdleIteration;
|
this.maxIdleIteration = maxIdleIteration;
|
||||||
this.nodesToBeExcluded = nodesToBeExcluded;
|
this.excludedNodes = excludedNodes;
|
||||||
this.nodesToBeIncluded = nodesToBeIncluded;
|
this.includedNodes = includedNodes;
|
||||||
|
this.sourceNodes = sourceNodes;
|
||||||
this.runDuringUpgrade = runDuringUpgrade;
|
this.runDuringUpgrade = runDuringUpgrade;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,13 +739,14 @@ public class Balancer {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("%s.%s [%s,"
|
return String.format("%s.%s [%s,"
|
||||||
+ " threshold = %s,"
|
+ " threshold = %s,"
|
||||||
+ " max idle iteration = %s, "
|
+ " max idle iteration = %s,"
|
||||||
+ "number of nodes to be excluded = %s,"
|
+ " #excluded nodes = %s,"
|
||||||
+ " number of nodes to be included = %s,"
|
+ " #included nodes = %s,"
|
||||||
|
+ " #source nodes = %s,"
|
||||||
+ " run during upgrade = %s]",
|
+ " run during upgrade = %s]",
|
||||||
Balancer.class.getSimpleName(), getClass().getSimpleName(),
|
Balancer.class.getSimpleName(), getClass().getSimpleName(),
|
||||||
policy, threshold, maxIdleIteration,
|
policy, threshold, maxIdleIteration,
|
||||||
nodesToBeExcluded.size(), nodesToBeIncluded.size(),
|
excludedNodes.size(), includedNodes.size(), sourceNodes.size(),
|
||||||
runDuringUpgrade);
|
runDuringUpgrade);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -764,8 +787,9 @@ public class Balancer {
|
||||||
BalancingPolicy policy = Parameters.DEFAULT.policy;
|
BalancingPolicy policy = Parameters.DEFAULT.policy;
|
||||||
double threshold = Parameters.DEFAULT.threshold;
|
double threshold = Parameters.DEFAULT.threshold;
|
||||||
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
|
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
|
||||||
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
|
Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
|
||||||
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
|
Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
|
||||||
|
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
|
||||||
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
|
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
|
||||||
|
|
||||||
if (args != null) {
|
if (args != null) {
|
||||||
|
@ -797,29 +821,14 @@ public class Balancer {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
} else if ("-exclude".equalsIgnoreCase(args[i])) {
|
} else if ("-exclude".equalsIgnoreCase(args[i])) {
|
||||||
checkArgument(++i < args.length,
|
excludedNodes = new HashSet<>();
|
||||||
"List of nodes to exclude | -f <filename> is missing: args = "
|
i = processHostList(args, i, "exclude", excludedNodes);
|
||||||
+ Arrays.toString(args));
|
|
||||||
if ("-f".equalsIgnoreCase(args[i])) {
|
|
||||||
checkArgument(++i < args.length,
|
|
||||||
"File containing nodes to exclude is not specified: args = "
|
|
||||||
+ Arrays.toString(args));
|
|
||||||
nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
|
|
||||||
} else {
|
|
||||||
nodesTobeExcluded = Util.parseHostList(args[i]);
|
|
||||||
}
|
|
||||||
} else if ("-include".equalsIgnoreCase(args[i])) {
|
} else if ("-include".equalsIgnoreCase(args[i])) {
|
||||||
checkArgument(++i < args.length,
|
includedNodes = new HashSet<>();
|
||||||
"List of nodes to include | -f <filename> is missing: args = "
|
i = processHostList(args, i, "include", includedNodes);
|
||||||
+ Arrays.toString(args));
|
} else if ("-source".equalsIgnoreCase(args[i])) {
|
||||||
if ("-f".equalsIgnoreCase(args[i])) {
|
sourceNodes = new HashSet<>();
|
||||||
checkArgument(++i < args.length,
|
i = processHostList(args, i, "source", sourceNodes);
|
||||||
"File containing nodes to include is not specified: args = "
|
|
||||||
+ Arrays.toString(args));
|
|
||||||
nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
|
|
||||||
} else {
|
|
||||||
nodesTobeIncluded = Util.parseHostList(args[i]);
|
|
||||||
}
|
|
||||||
} else if ("-idleiterations".equalsIgnoreCase(args[i])) {
|
} else if ("-idleiterations".equalsIgnoreCase(args[i])) {
|
||||||
checkArgument(++i < args.length,
|
checkArgument(++i < args.length,
|
||||||
"idleiterations value is missing: args = " + Arrays
|
"idleiterations value is missing: args = " + Arrays
|
||||||
|
@ -837,7 +846,7 @@ public class Balancer {
|
||||||
+ Arrays.toString(args));
|
+ Arrays.toString(args));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(),
|
checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(),
|
||||||
"-exclude and -include options cannot be specified together.");
|
"-exclude and -include options cannot be specified together.");
|
||||||
} catch(RuntimeException e) {
|
} catch(RuntimeException e) {
|
||||||
printUsage(System.err);
|
printUsage(System.err);
|
||||||
|
@ -846,7 +855,31 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Parameters(policy, threshold, maxIdleIteration,
|
return new Parameters(policy, threshold, maxIdleIteration,
|
||||||
nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
|
excludedNodes, includedNodes, sourceNodes, runDuringUpgrade);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int processHostList(String[] args, int i, String type,
|
||||||
|
Set<String> nodes) {
|
||||||
|
Preconditions.checkArgument(++i < args.length,
|
||||||
|
"List of %s nodes | -f <filename> is missing: args=%s",
|
||||||
|
type, Arrays.toString(args));
|
||||||
|
if ("-f".equalsIgnoreCase(args[i])) {
|
||||||
|
Preconditions.checkArgument(++i < args.length,
|
||||||
|
"File containing %s nodes is not specified: args=%s",
|
||||||
|
type, Arrays.toString(args));
|
||||||
|
|
||||||
|
final String filename = args[i];
|
||||||
|
try {
|
||||||
|
HostsFileReader.readFileToSet(type, filename, nodes);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Failed to read " + type + " node list from file: " + filename);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
final String[] addresses = StringUtils.getTrimmedStrings(args[i]);
|
||||||
|
nodes.addAll(Arrays.asList(addresses));
|
||||||
|
}
|
||||||
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void printUsage(PrintStream out) {
|
private static void printUsage(PrintStream out) {
|
||||||
|
|
|
@ -28,7 +28,6 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -72,7 +71,6 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.HostsFileReader;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
@ -798,7 +796,11 @@ public class Dispatcher {
|
||||||
if (shouldFetchMoreBlocks()) {
|
if (shouldFetchMoreBlocks()) {
|
||||||
// fetch new blocks
|
// fetch new blocks
|
||||||
try {
|
try {
|
||||||
blocksToReceive -= getBlockList();
|
final long received = getBlockList();
|
||||||
|
if (received == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
blocksToReceive -= received;
|
||||||
continue;
|
continue;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception while getting block list", e);
|
LOG.warn("Exception while getting block list", e);
|
||||||
|
@ -927,8 +929,11 @@ public class Dispatcher {
|
||||||
|
|
||||||
if (decommissioned || decommissioning || excluded || notIncluded) {
|
if (decommissioned || decommissioning || excluded || notIncluded) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
|
LOG.trace("Excluding datanode " + dn
|
||||||
+ decommissioning + ", " + excluded + ", " + notIncluded);
|
+ ": decommissioned=" + decommissioned
|
||||||
|
+ ", decommissioning=" + decommissioning
|
||||||
|
+ ", excluded=" + excluded
|
||||||
|
+ ", notIncluded=" + notIncluded);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1215,31 +1220,5 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
return (nodes.contains(host) || nodes.contains(host + ":" + port));
|
return (nodes.contains(host) || nodes.contains(host + ":" + port));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Parse a comma separated string to obtain set of host names
|
|
||||||
*
|
|
||||||
* @return set of host names
|
|
||||||
*/
|
|
||||||
static Set<String> parseHostList(String string) {
|
|
||||||
String[] addrs = StringUtils.getTrimmedStrings(string);
|
|
||||||
return new HashSet<String>(Arrays.asList(addrs));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read set of host names from a file
|
|
||||||
*
|
|
||||||
* @return set of host names
|
|
||||||
*/
|
|
||||||
static Set<String> getHostListFromFile(String fileName, String type) {
|
|
||||||
Set<String> nodes = new HashSet<String>();
|
|
||||||
try {
|
|
||||||
HostsFileReader.readFileToSet(type, fileName, nodes);
|
|
||||||
return StringUtils.getTrimmedStrings(nodes);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Failed to read host list from file: " + fileName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -397,11 +398,11 @@ public class TestBalancer {
|
||||||
long timeout = TIMEOUT;
|
long timeout = TIMEOUT;
|
||||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||||
: Time.monotonicNow() + timeout;
|
: Time.monotonicNow() + timeout;
|
||||||
if (!p.nodesToBeIncluded.isEmpty()) {
|
if (!p.includedNodes.isEmpty()) {
|
||||||
totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
|
totalCapacity = p.includedNodes.size() * CAPACITY;
|
||||||
}
|
}
|
||||||
if (!p.nodesToBeExcluded.isEmpty()) {
|
if (!p.excludedNodes.isEmpty()) {
|
||||||
totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
|
totalCapacity -= p.excludedNodes.size() * CAPACITY;
|
||||||
}
|
}
|
||||||
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
|
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
|
||||||
boolean balanced;
|
boolean balanced;
|
||||||
|
@ -414,12 +415,12 @@ public class TestBalancer {
|
||||||
for (DatanodeInfo datanode : datanodeReport) {
|
for (DatanodeInfo datanode : datanodeReport) {
|
||||||
double nodeUtilization = ((double)datanode.getDfsUsed())
|
double nodeUtilization = ((double)datanode.getDfsUsed())
|
||||||
/ datanode.getCapacity();
|
/ datanode.getCapacity();
|
||||||
if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) {
|
if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) {
|
||||||
assertTrue(nodeUtilization == 0);
|
assertTrue(nodeUtilization == 0);
|
||||||
actualExcludedNodeCount++;
|
actualExcludedNodeCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) {
|
if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) {
|
||||||
assertTrue(nodeUtilization == 0);
|
assertTrue(nodeUtilization == 0);
|
||||||
actualExcludedNodeCount++;
|
actualExcludedNodeCount++;
|
||||||
continue;
|
continue;
|
||||||
|
@ -642,6 +643,7 @@ public class TestBalancer {
|
||||||
Balancer.Parameters.DEFAULT.threshold,
|
Balancer.Parameters.DEFAULT.threshold,
|
||||||
Balancer.Parameters.DEFAULT.maxIdleIteration,
|
Balancer.Parameters.DEFAULT.maxIdleIteration,
|
||||||
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
|
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
|
||||||
|
Balancer.Parameters.DEFAULT.sourceNodes,
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -754,36 +756,36 @@ public class TestBalancer {
|
||||||
args.add("datanode");
|
args.add("datanode");
|
||||||
|
|
||||||
File excludeHostsFile = null;
|
File excludeHostsFile = null;
|
||||||
if (!p.nodesToBeExcluded.isEmpty()) {
|
if (!p.excludedNodes.isEmpty()) {
|
||||||
args.add("-exclude");
|
args.add("-exclude");
|
||||||
if (useFile) {
|
if (useFile) {
|
||||||
excludeHostsFile = new File ("exclude-hosts-file");
|
excludeHostsFile = new File ("exclude-hosts-file");
|
||||||
PrintWriter pw = new PrintWriter(excludeHostsFile);
|
PrintWriter pw = new PrintWriter(excludeHostsFile);
|
||||||
for (String host: p.nodesToBeExcluded) {
|
for (String host: p.excludedNodes) {
|
||||||
pw.write( host + "\n");
|
pw.write( host + "\n");
|
||||||
}
|
}
|
||||||
pw.close();
|
pw.close();
|
||||||
args.add("-f");
|
args.add("-f");
|
||||||
args.add("exclude-hosts-file");
|
args.add("exclude-hosts-file");
|
||||||
} else {
|
} else {
|
||||||
args.add(StringUtils.join(p.nodesToBeExcluded, ','));
|
args.add(StringUtils.join(p.excludedNodes, ','));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
File includeHostsFile = null;
|
File includeHostsFile = null;
|
||||||
if (!p.nodesToBeIncluded.isEmpty()) {
|
if (!p.includedNodes.isEmpty()) {
|
||||||
args.add("-include");
|
args.add("-include");
|
||||||
if (useFile) {
|
if (useFile) {
|
||||||
includeHostsFile = new File ("include-hosts-file");
|
includeHostsFile = new File ("include-hosts-file");
|
||||||
PrintWriter pw = new PrintWriter(includeHostsFile);
|
PrintWriter pw = new PrintWriter(includeHostsFile);
|
||||||
for (String host: p.nodesToBeIncluded){
|
for (String host: p.includedNodes){
|
||||||
pw.write( host + "\n");
|
pw.write( host + "\n");
|
||||||
}
|
}
|
||||||
pw.close();
|
pw.close();
|
||||||
args.add("-f");
|
args.add("-f");
|
||||||
args.add("include-hosts-file");
|
args.add("include-hosts-file");
|
||||||
} else {
|
} else {
|
||||||
args.add(StringUtils.join(p.nodesToBeIncluded, ','));
|
args.add(StringUtils.join(p.includedNodes, ','));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -881,7 +883,8 @@ public class TestBalancer {
|
||||||
Balancer.Parameters.DEFAULT.policy,
|
Balancer.Parameters.DEFAULT.policy,
|
||||||
Balancer.Parameters.DEFAULT.threshold,
|
Balancer.Parameters.DEFAULT.threshold,
|
||||||
Balancer.Parameters.DEFAULT.maxIdleIteration,
|
Balancer.Parameters.DEFAULT.maxIdleIteration,
|
||||||
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
|
datanodes, Balancer.Parameters.DEFAULT.includedNodes,
|
||||||
|
Balancer.Parameters.DEFAULT.sourceNodes,
|
||||||
false);
|
false);
|
||||||
final int r = Balancer.run(namenodes, p, conf);
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
@ -1094,7 +1097,7 @@ public class TestBalancer {
|
||||||
excludeHosts.add( "datanodeZ");
|
excludeHosts.add( "datanodeZ");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||||
excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
|
excludeHosts, Parameters.DEFAULT.includedNodes), false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1124,7 +1127,7 @@ public class TestBalancer {
|
||||||
excludeHosts.add( "datanodeZ");
|
excludeHosts.add( "datanodeZ");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
|
||||||
Parameters.DEFAULT.nodesToBeIncluded), true, false);
|
Parameters.DEFAULT.includedNodes), true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1154,7 +1157,7 @@ public class TestBalancer {
|
||||||
excludeHosts.add( "datanodeZ");
|
excludeHosts.add( "datanodeZ");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||||
excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
|
excludeHosts, Parameters.DEFAULT.includedNodes), true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1183,7 +1186,7 @@ public class TestBalancer {
|
||||||
includeHosts.add( "datanodeY");
|
includeHosts.add( "datanodeY");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
|
Parameters.DEFAULT.excludedNodes, includeHosts), false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1212,7 +1215,7 @@ public class TestBalancer {
|
||||||
includeHosts.add( "datanodeY");
|
includeHosts.add( "datanodeY");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
|
Parameters.DEFAULT.excludedNodes, includeHosts), true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1241,7 +1244,7 @@ public class TestBalancer {
|
||||||
includeHosts.add( "datanodeY");
|
includeHosts.add( "datanodeY");
|
||||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
|
Parameters.DEFAULT.excludedNodes, includeHosts), true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1381,8 +1384,9 @@ public class TestBalancer {
|
||||||
new Balancer.Parameters(Parameters.DEFAULT.policy,
|
new Balancer.Parameters(Parameters.DEFAULT.policy,
|
||||||
Parameters.DEFAULT.threshold,
|
Parameters.DEFAULT.threshold,
|
||||||
Parameters.DEFAULT.maxIdleIteration,
|
Parameters.DEFAULT.maxIdleIteration,
|
||||||
Parameters.DEFAULT.nodesToBeExcluded,
|
Parameters.DEFAULT.excludedNodes,
|
||||||
Parameters.DEFAULT.nodesToBeIncluded,
|
Parameters.DEFAULT.includedNodes,
|
||||||
|
Parameters.DEFAULT.sourceNodes,
|
||||||
true);
|
true);
|
||||||
assertEquals(ExitStatus.SUCCESS.getExitCode(),
|
assertEquals(ExitStatus.SUCCESS.getExitCode(),
|
||||||
Balancer.run(namenodes, runDuringUpgrade, conf));
|
Balancer.run(namenodes, runDuringUpgrade, conf));
|
||||||
|
@ -1538,6 +1542,116 @@ public class TestBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Balancer should not move blocks with size < minBlockSize. */
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testMinBlockSizeAndSourceNodes() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
|
||||||
|
final short replication = 3;
|
||||||
|
final long[] lengths = {10, 10, 10, 10};
|
||||||
|
final long[] capacities = new long[replication];
|
||||||
|
final long totalUsed = capacities.length * sum(lengths);
|
||||||
|
Arrays.fill(capacities, 1000);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(capacities.length)
|
||||||
|
.simulatedCapacities(capacities)
|
||||||
|
.build();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
client = NameNodeProxies.createProxy(conf, dfs.getUri(),
|
||||||
|
ClientProtocol.class).getProxy();
|
||||||
|
|
||||||
|
// fill up the cluster to be 80% full
|
||||||
|
for(int i = 0; i < lengths.length; i++) {
|
||||||
|
final long size = lengths[i];
|
||||||
|
final Path p = new Path("/file" + i + "_size" + size);
|
||||||
|
try(final OutputStream out = dfs.create(p)) {
|
||||||
|
for(int j = 0; j < size; j++) {
|
||||||
|
out.write(j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start up an empty node with the same capacity
|
||||||
|
cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
|
||||||
|
LOG.info("capacities = " + Arrays.toString(capacities));
|
||||||
|
LOG.info("totalUsedSpace= " + totalUsed);
|
||||||
|
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
|
||||||
|
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
|
||||||
|
|
||||||
|
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
|
|
||||||
|
{ // run Balancer with min-block-size=50
|
||||||
|
final Parameters p = new Parameters(
|
||||||
|
BalancingPolicy.Node.INSTANCE, 1,
|
||||||
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
||||||
|
Collections.<String> emptySet(), Collections.<String> emptySet(),
|
||||||
|
Collections.<String> emptySet(), false);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
|
||||||
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
||||||
|
|
||||||
|
{ // run Balancer with empty nodes as source nodes
|
||||||
|
final Set<String> sourceNodes = new HashSet<>();
|
||||||
|
final List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
for(int i = capacities.length; i < datanodes.size(); i++) {
|
||||||
|
sourceNodes.add(datanodes.get(i).getDisplayName());
|
||||||
|
}
|
||||||
|
final Parameters p = new Parameters(
|
||||||
|
BalancingPolicy.Node.INSTANCE, 1,
|
||||||
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
||||||
|
Collections.<String> emptySet(), Collections.<String> emptySet(),
|
||||||
|
sourceNodes, false);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
|
||||||
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // run Balancer with a filled node as a source node
|
||||||
|
final Set<String> sourceNodes = new HashSet<>();
|
||||||
|
final List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
sourceNodes.add(datanodes.get(0).getDisplayName());
|
||||||
|
final Parameters p = new Parameters(
|
||||||
|
BalancingPolicy.Node.INSTANCE, 1,
|
||||||
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
||||||
|
Collections.<String> emptySet(), Collections.<String> emptySet(),
|
||||||
|
sourceNodes, false);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
||||||
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // run Balancer with all filled node as source nodes
|
||||||
|
final Set<String> sourceNodes = new HashSet<>();
|
||||||
|
final List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
for(int i = 0; i < capacities.length; i++) {
|
||||||
|
sourceNodes.add(datanodes.get(i).getDisplayName());
|
||||||
|
}
|
||||||
|
final Parameters p = new Parameters(
|
||||||
|
BalancingPolicy.Node.INSTANCE, 1,
|
||||||
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
|
||||||
|
Collections.<String> emptySet(), Collections.<String> emptySet(),
|
||||||
|
sourceNodes, false);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
||||||
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue