HDFS-6837. Code cleanup for Balancer and Dispatcher. Contributed by Tsz Wo Nicholas Sze.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617337 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
946be75704
commit
e60673697d
|
@ -387,6 +387,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
|
HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
|
||||||
jing9)
|
jing9)
|
||||||
|
|
||||||
|
HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via
|
||||||
|
jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -44,7 +44,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode;
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
|
||||||
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
|
||||||
|
@ -184,10 +185,10 @@ public class Balancer {
|
||||||
// all data node lists
|
// all data node lists
|
||||||
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
||||||
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
|
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
|
||||||
private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
|
private final Collection<StorageGroup> belowAvgUtilized
|
||||||
= new LinkedList<BalancerDatanode.StorageGroup>();
|
= new LinkedList<StorageGroup>();
|
||||||
private final Collection<BalancerDatanode.StorageGroup> underUtilized
|
private final Collection<StorageGroup> underUtilized
|
||||||
= new LinkedList<BalancerDatanode.StorageGroup>();
|
= new LinkedList<StorageGroup>();
|
||||||
|
|
||||||
/* Check that this Balancer is compatible with the Block Placement Policy
|
/* Check that this Balancer is compatible with the Block Placement Policy
|
||||||
* used by the Namenode.
|
* used by the Namenode.
|
||||||
|
@ -209,8 +210,22 @@ public class Balancer {
|
||||||
* when connection fails.
|
* when connection fails.
|
||||||
*/
|
*/
|
||||||
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
|
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
|
||||||
|
final long movedWinWidth = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||||
|
final int moverThreads = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
|
||||||
|
final int dispatcherThreads = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
||||||
|
final int maxConcurrentMovesPerNode = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
|
||||||
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
|
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
|
||||||
p.nodesToBeExcluded, conf);
|
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
|
||||||
|
maxConcurrentMovesPerNode, conf);
|
||||||
this.threshold = p.threshold;
|
this.threshold = p.threshold;
|
||||||
this.policy = p.policy;
|
this.policy = p.policy;
|
||||||
}
|
}
|
||||||
|
@ -255,7 +270,7 @@ public class Balancer {
|
||||||
// over-utilized, above-average, below-average and under-utilized.
|
// over-utilized, above-average, below-average and under-utilized.
|
||||||
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
||||||
for(DatanodeStorageReport r : reports) {
|
for(DatanodeStorageReport r : reports) {
|
||||||
final BalancerDatanode dn = dispatcher.newDatanode(r);
|
final DDatanode dn = dispatcher.newDatanode(r);
|
||||||
for(StorageType t : StorageType.asList()) {
|
for(StorageType t : StorageType.asList()) {
|
||||||
final Double utilization = policy.getUtilization(r, t);
|
final Double utilization = policy.getUtilization(r, t);
|
||||||
if (utilization == null) { // datanode does not have such storage type
|
if (utilization == null) { // datanode does not have such storage type
|
||||||
|
@ -268,9 +283,9 @@ public class Balancer {
|
||||||
final long maxSize2Move = computeMaxSize2Move(capacity,
|
final long maxSize2Move = computeMaxSize2Move(capacity,
|
||||||
getRemaining(r, t), utilizationDiff, threshold);
|
getRemaining(r, t), utilizationDiff, threshold);
|
||||||
|
|
||||||
final BalancerDatanode.StorageGroup g;
|
final StorageGroup g;
|
||||||
if (utilizationDiff > 0) {
|
if (utilizationDiff > 0) {
|
||||||
final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher);
|
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
|
||||||
if (thresholdDiff <= 0) { // within threshold
|
if (thresholdDiff <= 0) { // within threshold
|
||||||
aboveAvgUtilized.add(s);
|
aboveAvgUtilized.add(s);
|
||||||
} else {
|
} else {
|
||||||
|
@ -279,7 +294,7 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
g = s;
|
g = s;
|
||||||
} else {
|
} else {
|
||||||
g = dn.addStorageGroup(t, utilization, maxSize2Move);
|
g = dn.addStorageGroup(t, maxSize2Move);
|
||||||
if (thresholdDiff <= 0) { // within threshold
|
if (thresholdDiff <= 0) { // within threshold
|
||||||
belowAvgUtilized.add(g);
|
belowAvgUtilized.add(g);
|
||||||
} else {
|
} else {
|
||||||
|
@ -328,7 +343,7 @@ public class Balancer {
|
||||||
logUtilizationCollection("underutilized", underUtilized);
|
logUtilizationCollection("underutilized", underUtilized);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends BalancerDatanode.StorageGroup>
|
private static <T extends StorageGroup>
|
||||||
void logUtilizationCollection(String name, Collection<T> items) {
|
void logUtilizationCollection(String name, Collection<T> items) {
|
||||||
LOG.info(items.size() + " " + name + ": " + items);
|
LOG.info(items.size() + " " + name + ": " + items);
|
||||||
}
|
}
|
||||||
|
@ -381,8 +396,7 @@ public class Balancer {
|
||||||
* datanodes or the candidates are source nodes with (utilization > Avg), and
|
* datanodes or the candidates are source nodes with (utilization > Avg), and
|
||||||
* the others are target nodes with (utilization < Avg).
|
* the others are target nodes with (utilization < Avg).
|
||||||
*/
|
*/
|
||||||
private <G extends BalancerDatanode.StorageGroup,
|
private <G extends StorageGroup, C extends StorageGroup>
|
||||||
C extends BalancerDatanode.StorageGroup>
|
|
||||||
void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
|
void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
|
||||||
Matcher matcher) {
|
Matcher matcher) {
|
||||||
for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
|
for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
|
||||||
|
@ -398,9 +412,8 @@ public class Balancer {
|
||||||
* For the given datanode, choose a candidate and then schedule it.
|
* For the given datanode, choose a candidate and then schedule it.
|
||||||
* @return true if a candidate is chosen; false if no candidates is chosen.
|
* @return true if a candidate is chosen; false if no candidates is chosen.
|
||||||
*/
|
*/
|
||||||
private <C extends BalancerDatanode.StorageGroup>
|
private <C extends StorageGroup> boolean choose4One(StorageGroup g,
|
||||||
boolean choose4One(BalancerDatanode.StorageGroup g,
|
Collection<C> candidates, Matcher matcher) {
|
||||||
Collection<C> candidates, Matcher matcher) {
|
|
||||||
final Iterator<C> i = candidates.iterator();
|
final Iterator<C> i = candidates.iterator();
|
||||||
final C chosen = chooseCandidate(g, i, matcher);
|
final C chosen = chooseCandidate(g, i, matcher);
|
||||||
|
|
||||||
|
@ -418,8 +431,7 @@ public class Balancer {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void matchSourceWithTargetToMove(Source source,
|
private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
|
||||||
BalancerDatanode.StorageGroup target) {
|
|
||||||
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
|
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
|
||||||
final Task task = new Task(target, size);
|
final Task task = new Task(target, size);
|
||||||
source.addTask(task);
|
source.addTask(task);
|
||||||
|
@ -430,8 +442,7 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Choose a candidate for the given datanode. */
|
/** Choose a candidate for the given datanode. */
|
||||||
private <G extends BalancerDatanode.StorageGroup,
|
private <G extends StorageGroup, C extends StorageGroup>
|
||||||
C extends BalancerDatanode.StorageGroup>
|
|
||||||
C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
|
C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
|
||||||
if (g.hasSpaceForScheduling()) {
|
if (g.hasSpaceForScheduling()) {
|
||||||
for(; candidates.hasNext(); ) {
|
for(; candidates.hasNext(); ) {
|
||||||
|
@ -439,7 +450,7 @@ public class Balancer {
|
||||||
if (!c.hasSpaceForScheduling()) {
|
if (!c.hasSpaceForScheduling()) {
|
||||||
candidates.remove();
|
candidates.remove();
|
||||||
} else if (matcher.match(dispatcher.getCluster(),
|
} else if (matcher.match(dispatcher.getCluster(),
|
||||||
g.getDatanode(), c.getDatanode())) {
|
g.getDatanodeInfo(), c.getDatanodeInfo())) {
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -457,34 +468,15 @@ public class Balancer {
|
||||||
dispatcher.reset(conf);;
|
dispatcher.reset(conf);;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exit status
|
|
||||||
enum ReturnStatus {
|
|
||||||
// These int values will map directly to the balancer process's exit code.
|
|
||||||
SUCCESS(0),
|
|
||||||
IN_PROGRESS(1),
|
|
||||||
ALREADY_RUNNING(-1),
|
|
||||||
NO_MOVE_BLOCK(-2),
|
|
||||||
NO_MOVE_PROGRESS(-3),
|
|
||||||
IO_EXCEPTION(-4),
|
|
||||||
ILLEGAL_ARGS(-5),
|
|
||||||
INTERRUPTED(-6);
|
|
||||||
|
|
||||||
final int code;
|
|
||||||
|
|
||||||
ReturnStatus(int code) {
|
|
||||||
this.code = code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Run an iteration for all datanodes. */
|
/** Run an iteration for all datanodes. */
|
||||||
private ReturnStatus run(int iteration, Formatter formatter,
|
private ExitStatus run(int iteration, Formatter formatter,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
try {
|
try {
|
||||||
final List<DatanodeStorageReport> reports = dispatcher.init();
|
final List<DatanodeStorageReport> reports = dispatcher.init();
|
||||||
final long bytesLeftToMove = init(reports);
|
final long bytesLeftToMove = init(reports);
|
||||||
if (bytesLeftToMove == 0) {
|
if (bytesLeftToMove == 0) {
|
||||||
System.out.println("The cluster is balanced. Exiting...");
|
System.out.println("The cluster is balanced. Exiting...");
|
||||||
return ReturnStatus.SUCCESS;
|
return ExitStatus.SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
|
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
|
||||||
+ " to make the cluster balanced." );
|
+ " to make the cluster balanced." );
|
||||||
|
@ -498,7 +490,7 @@ public class Balancer {
|
||||||
final long bytesToMove = chooseStorageGroups();
|
final long bytesToMove = chooseStorageGroups();
|
||||||
if (bytesToMove == 0) {
|
if (bytesToMove == 0) {
|
||||||
System.out.println("No block can be moved. Exiting...");
|
System.out.println("No block can be moved. Exiting...");
|
||||||
return ReturnStatus.NO_MOVE_BLOCK;
|
return ExitStatus.NO_MOVE_BLOCK;
|
||||||
} else {
|
} else {
|
||||||
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
|
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
|
||||||
" in this iteration");
|
" in this iteration");
|
||||||
|
@ -519,19 +511,19 @@ public class Balancer {
|
||||||
* Exit no byte has been moved for 5 consecutive iterations.
|
* Exit no byte has been moved for 5 consecutive iterations.
|
||||||
*/
|
*/
|
||||||
if (!dispatcher.dispatchAndCheckContinue()) {
|
if (!dispatcher.dispatchAndCheckContinue()) {
|
||||||
return ReturnStatus.NO_MOVE_PROGRESS;
|
return ExitStatus.NO_MOVE_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ReturnStatus.IN_PROGRESS;
|
return ExitStatus.IN_PROGRESS;
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ReturnStatus.ILLEGAL_ARGS;
|
return ExitStatus.ILLEGAL_ARGUMENTS;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ReturnStatus.IO_EXCEPTION;
|
return ExitStatus.IO_EXCEPTION;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ReturnStatus.INTERRUPTED;
|
return ExitStatus.INTERRUPTED;
|
||||||
} finally {
|
} finally {
|
||||||
dispatcher.shutdownNow();
|
dispatcher.shutdownNow();
|
||||||
}
|
}
|
||||||
|
@ -570,14 +562,14 @@ public class Balancer {
|
||||||
Collections.shuffle(connectors);
|
Collections.shuffle(connectors);
|
||||||
for(NameNodeConnector nnc : connectors) {
|
for(NameNodeConnector nnc : connectors) {
|
||||||
final Balancer b = new Balancer(nnc, p, conf);
|
final Balancer b = new Balancer(nnc, p, conf);
|
||||||
final ReturnStatus r = b.run(iteration, formatter, conf);
|
final ExitStatus r = b.run(iteration, formatter, conf);
|
||||||
// clean all lists
|
// clean all lists
|
||||||
b.resetData(conf);
|
b.resetData(conf);
|
||||||
if (r == ReturnStatus.IN_PROGRESS) {
|
if (r == ExitStatus.IN_PROGRESS) {
|
||||||
done = false;
|
done = false;
|
||||||
} else if (r != ReturnStatus.SUCCESS) {
|
} else if (r != ExitStatus.SUCCESS) {
|
||||||
//must be an error statue, return.
|
//must be an error statue, return.
|
||||||
return r.code;
|
return r.getExitCode();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -590,7 +582,7 @@ public class Balancer {
|
||||||
nnc.close();
|
nnc.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ReturnStatus.SUCCESS.code;
|
return ExitStatus.SUCCESS.getExitCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Given elaspedTime in ms, return a printable string */
|
/* Given elaspedTime in ms, return a printable string */
|
||||||
|
@ -661,10 +653,10 @@ public class Balancer {
|
||||||
return Balancer.run(namenodes, parse(args), conf);
|
return Balancer.run(namenodes, parse(args), conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ReturnStatus.IO_EXCEPTION.code;
|
return ExitStatus.IO_EXCEPTION.getExitCode();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ReturnStatus.INTERRUPTED.code;
|
return ExitStatus.INTERRUPTED.getExitCode();
|
||||||
} finally {
|
} finally {
|
||||||
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
|
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
|
||||||
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
|
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
|
||||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
|
@ -91,7 +91,6 @@ public class Dispatcher {
|
||||||
// minutes
|
// minutes
|
||||||
|
|
||||||
private final NameNodeConnector nnc;
|
private final NameNodeConnector nnc;
|
||||||
private final KeyManager keyManager;
|
|
||||||
private final SaslDataTransferClient saslClient;
|
private final SaslDataTransferClient saslClient;
|
||||||
|
|
||||||
/** Set of datanodes to be excluded. */
|
/** Set of datanodes to be excluded. */
|
||||||
|
@ -100,11 +99,10 @@ public class Dispatcher {
|
||||||
private final Set<String> includedNodes;
|
private final Set<String> includedNodes;
|
||||||
|
|
||||||
private final Collection<Source> sources = new HashSet<Source>();
|
private final Collection<Source> sources = new HashSet<Source>();
|
||||||
private final Collection<BalancerDatanode.StorageGroup> targets
|
private final Collection<StorageGroup> targets = new HashSet<StorageGroup>();
|
||||||
= new HashSet<BalancerDatanode.StorageGroup>();
|
|
||||||
|
|
||||||
private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
|
private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
|
||||||
private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
|
private final MovedBlocks<StorageGroup> movedBlocks;
|
||||||
|
|
||||||
/** Map (datanodeUuid,storageType -> StorageGroup) */
|
/** Map (datanodeUuid,storageType -> StorageGroup) */
|
||||||
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
|
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
|
||||||
|
@ -135,8 +133,7 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Remove all blocks except for the moved blocks. */
|
/** Remove all blocks except for the moved blocks. */
|
||||||
private void removeAllButRetain(
|
private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
|
||||||
MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) {
|
|
||||||
for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
|
for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
|
||||||
if (!movedBlocks.contains(i.next())) {
|
if (!movedBlocks.contains(i.next())) {
|
||||||
i.remove();
|
i.remove();
|
||||||
|
@ -150,17 +147,15 @@ public class Dispatcher {
|
||||||
return datanodeUuid + ":" + storageType;
|
return datanodeUuid + ":" + storageType;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<String, BalancerDatanode.StorageGroup> map
|
private final Map<String, StorageGroup> map = new HashMap<String, StorageGroup>();
|
||||||
= new HashMap<String, BalancerDatanode.StorageGroup>();
|
|
||||||
|
|
||||||
BalancerDatanode.StorageGroup get(String datanodeUuid,
|
StorageGroup get(String datanodeUuid, StorageType storageType) {
|
||||||
StorageType storageType) {
|
|
||||||
return map.get(toKey(datanodeUuid, storageType));
|
return map.get(toKey(datanodeUuid, storageType));
|
||||||
}
|
}
|
||||||
|
|
||||||
void put(BalancerDatanode.StorageGroup g) {
|
void put(StorageGroup g) {
|
||||||
final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
|
final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType);
|
||||||
final BalancerDatanode.StorageGroup existing = map.put(key, g);
|
final StorageGroup existing = map.put(key, g);
|
||||||
Preconditions.checkState(existing == null);
|
Preconditions.checkState(existing == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,8 +172,8 @@ public class Dispatcher {
|
||||||
private class PendingMove {
|
private class PendingMove {
|
||||||
private DBlock block;
|
private DBlock block;
|
||||||
private Source source;
|
private Source source;
|
||||||
private BalancerDatanode proxySource;
|
private DDatanode proxySource;
|
||||||
private BalancerDatanode.StorageGroup target;
|
private StorageGroup target;
|
||||||
|
|
||||||
private PendingMove() {
|
private PendingMove() {
|
||||||
}
|
}
|
||||||
|
@ -235,24 +230,24 @@ public class Dispatcher {
|
||||||
* @return true if a proxy is found; otherwise false
|
* @return true if a proxy is found; otherwise false
|
||||||
*/
|
*/
|
||||||
private boolean chooseProxySource() {
|
private boolean chooseProxySource() {
|
||||||
final DatanodeInfo targetDN = target.getDatanode();
|
final DatanodeInfo targetDN = target.getDatanodeInfo();
|
||||||
// if node group is supported, first try add nodes in the same node group
|
// if node group is supported, first try add nodes in the same node group
|
||||||
if (cluster.isNodeGroupAware()) {
|
if (cluster.isNodeGroupAware()) {
|
||||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
for (StorageGroup loc : block.getLocations()) {
|
||||||
if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)
|
if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
|
||||||
&& addTo(loc)) {
|
&& addTo(loc)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check if there is replica which is on the same rack with the target
|
// check if there is replica which is on the same rack with the target
|
||||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
for (StorageGroup loc : block.getLocations()) {
|
||||||
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
|
if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// find out a non-busy replica
|
// find out a non-busy replica
|
||||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
for (StorageGroup loc : block.getLocations()) {
|
||||||
if (addTo(loc)) {
|
if (addTo(loc)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -261,10 +256,10 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** add to a proxy source for specific block movement */
|
/** add to a proxy source for specific block movement */
|
||||||
private boolean addTo(BalancerDatanode.StorageGroup g) {
|
private boolean addTo(StorageGroup g) {
|
||||||
final BalancerDatanode bdn = g.getBalancerDatanode();
|
final DDatanode dn = g.getDDatanode();
|
||||||
if (bdn.addPendingBlock(this)) {
|
if (dn.addPendingBlock(this)) {
|
||||||
proxySource = bdn;
|
proxySource = dn;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -281,14 +276,13 @@ public class Dispatcher {
|
||||||
DataInputStream in = null;
|
DataInputStream in = null;
|
||||||
try {
|
try {
|
||||||
sock.connect(
|
sock.connect(
|
||||||
NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
|
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
/*
|
/*
|
||||||
* Unfortunately we don't have a good way to know if the Datanode is
|
* Unfortunately we don't have a good way to know if the Datanode is
|
||||||
* taking a really long time to move a block, OR something has gone
|
* taking a really long time to move a block, OR something has gone
|
||||||
* wrong and it's never going to finish. To deal with this scenario, we
|
* wrong and it's never going to finish. To deal with this scenario, we
|
||||||
* set a long timeout (20 minutes) to avoid hanging the balancer
|
* set a long timeout (20 minutes) to avoid hanging indefinitely.
|
||||||
* indefinitely.
|
|
||||||
*/
|
*/
|
||||||
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
|
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
|
||||||
|
|
||||||
|
@ -298,9 +292,10 @@ public class Dispatcher {
|
||||||
InputStream unbufIn = sock.getInputStream();
|
InputStream unbufIn = sock.getInputStream();
|
||||||
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
|
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
|
||||||
block.getBlock());
|
block.getBlock());
|
||||||
Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
|
final KeyManager km = nnc.getKeyManager();
|
||||||
|
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
|
||||||
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
|
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
|
||||||
unbufIn, keyManager, accessToken, target.getDatanode());
|
unbufIn, km, accessToken, target.getDatanodeInfo());
|
||||||
unbufOut = saslStreams.out;
|
unbufOut = saslStreams.out;
|
||||||
unbufIn = saslStreams.in;
|
unbufIn = saslStreams.in;
|
||||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||||
|
@ -314,21 +309,19 @@ public class Dispatcher {
|
||||||
LOG.info("Successfully moved " + this);
|
LOG.info("Successfully moved " + this);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to move " + this + ": " + e.getMessage());
|
LOG.warn("Failed to move " + this + ": " + e.getMessage());
|
||||||
/*
|
// Proxy or target may have some issues, delay before using these nodes
|
||||||
* proxy or target may have an issue, insert a small delay before using
|
// further in order to avoid a potential storm of "threads quota
|
||||||
* these nodes further. This avoids a potential storm of
|
// exceeded" warnings when the dispatcher gets out of sync with work
|
||||||
* "threads quota exceeded" Warnings when the balancer gets out of sync
|
// going on in datanodes.
|
||||||
* with work going on in datanode.
|
|
||||||
*/
|
|
||||||
proxySource.activateDelay(DELAY_AFTER_ERROR);
|
proxySource.activateDelay(DELAY_AFTER_ERROR);
|
||||||
target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
|
target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
IOUtils.closeSocket(sock);
|
IOUtils.closeSocket(sock);
|
||||||
|
|
||||||
proxySource.removePendingBlock(this);
|
proxySource.removePendingBlock(this);
|
||||||
target.getBalancerDatanode().removePendingBlock(this);
|
target.getDDatanode().removePendingBlock(this);
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
reset();
|
reset();
|
||||||
|
@ -342,8 +335,8 @@ public class Dispatcher {
|
||||||
/** Send a block replace request to the output stream */
|
/** Send a block replace request to the output stream */
|
||||||
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
|
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
|
||||||
Token<BlockTokenIdentifier> accessToken) throws IOException {
|
Token<BlockTokenIdentifier> accessToken) throws IOException {
|
||||||
new Sender(out).replaceBlock(eb, target.storageType, accessToken, source
|
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
|
||||||
.getDatanode().getDatanodeUuid(), proxySource.datanode);
|
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Receive a block copy response from the input stream */
|
/** Receive a block copy response from the input stream */
|
||||||
|
@ -368,8 +361,7 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A class for keeping track of block locations in the dispatcher. */
|
/** A class for keeping track of block locations in the dispatcher. */
|
||||||
private static class DBlock extends
|
private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
|
||||||
MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
|
|
||||||
DBlock(Block block) {
|
DBlock(Block block) {
|
||||||
super(block);
|
super(block);
|
||||||
}
|
}
|
||||||
|
@ -377,10 +369,10 @@ public class Dispatcher {
|
||||||
|
|
||||||
/** The class represents a desired move. */
|
/** The class represents a desired move. */
|
||||||
static class Task {
|
static class Task {
|
||||||
private final BalancerDatanode.StorageGroup target;
|
private final StorageGroup target;
|
||||||
private long size; // bytes scheduled to move
|
private long size; // bytes scheduled to move
|
||||||
|
|
||||||
Task(BalancerDatanode.StorageGroup target, long size) {
|
Task(StorageGroup target, long size) {
|
||||||
this.target = target;
|
this.target = target;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
}
|
}
|
||||||
|
@ -391,28 +383,25 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A class that keeps track of a datanode. */
|
/** A class that keeps track of a datanode. */
|
||||||
static class BalancerDatanode {
|
static class DDatanode {
|
||||||
|
|
||||||
/** A group of storages in a datanode with the same storage type. */
|
/** A group of storages in a datanode with the same storage type. */
|
||||||
class StorageGroup {
|
class StorageGroup {
|
||||||
final StorageType storageType;
|
final StorageType storageType;
|
||||||
final double utilization;
|
|
||||||
final long maxSize2Move;
|
final long maxSize2Move;
|
||||||
private long scheduledSize = 0L;
|
private long scheduledSize = 0L;
|
||||||
|
|
||||||
private StorageGroup(StorageType storageType, double utilization,
|
private StorageGroup(StorageType storageType, long maxSize2Move) {
|
||||||
long maxSize2Move) {
|
|
||||||
this.storageType = storageType;
|
this.storageType = storageType;
|
||||||
this.utilization = utilization;
|
|
||||||
this.maxSize2Move = maxSize2Move;
|
this.maxSize2Move = maxSize2Move;
|
||||||
}
|
}
|
||||||
|
|
||||||
BalancerDatanode getBalancerDatanode() {
|
private DDatanode getDDatanode() {
|
||||||
return BalancerDatanode.this;
|
return DDatanode.this;
|
||||||
}
|
}
|
||||||
|
|
||||||
DatanodeInfo getDatanode() {
|
DatanodeInfo getDatanodeInfo() {
|
||||||
return BalancerDatanode.this.datanode;
|
return DDatanode.this.datanode;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Decide if still need to move more bytes */
|
/** Decide if still need to move more bytes */
|
||||||
|
@ -447,7 +436,7 @@ public class Dispatcher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "" + utilization;
|
return getDisplayName();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,10 +450,10 @@ public class Dispatcher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
|
return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
|
private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
|
||||||
this.datanode = r.getDatanodeInfo();
|
this.datanode = r.getDatanodeInfo();
|
||||||
this.maxConcurrentMoves = maxConcurrentMoves;
|
this.maxConcurrentMoves = maxConcurrentMoves;
|
||||||
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
|
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
|
||||||
|
@ -475,18 +464,14 @@ public class Dispatcher {
|
||||||
Preconditions.checkState(existing == null);
|
Preconditions.checkState(existing == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageGroup addStorageGroup(StorageType storageType, double utilization,
|
StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
|
||||||
long maxSize2Move) {
|
final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
|
||||||
final StorageGroup g = new StorageGroup(storageType, utilization,
|
|
||||||
maxSize2Move);
|
|
||||||
put(storageType, g);
|
put(storageType, g);
|
||||||
return g;
|
return g;
|
||||||
}
|
}
|
||||||
|
|
||||||
Source addSource(StorageType storageType, double utilization,
|
Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
|
||||||
long maxSize2Move, Dispatcher balancer) {
|
final Source s = d.new Source(storageType, maxSize2Move, this);
|
||||||
final Source s = balancer.new Source(storageType, utilization,
|
|
||||||
maxSize2Move, this);
|
|
||||||
put(storageType, s);
|
put(storageType, s);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
@ -528,7 +513,7 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A node that can be the sources of a block move */
|
/** A node that can be the sources of a block move */
|
||||||
class Source extends BalancerDatanode.StorageGroup {
|
class Source extends DDatanode.StorageGroup {
|
||||||
|
|
||||||
private final List<Task> tasks = new ArrayList<Task>(2);
|
private final List<Task> tasks = new ArrayList<Task>(2);
|
||||||
private long blocksToReceive = 0L;
|
private long blocksToReceive = 0L;
|
||||||
|
@ -539,9 +524,8 @@ public class Dispatcher {
|
||||||
*/
|
*/
|
||||||
private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
|
private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
|
||||||
|
|
||||||
private Source(StorageType storageType, double utilization,
|
private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
|
||||||
long maxSize2Move, BalancerDatanode dn) {
|
dn.super(storageType, maxSize2Move);
|
||||||
dn.super(storageType, utilization, maxSize2Move);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add a task */
|
/** Add a task */
|
||||||
|
@ -565,7 +549,7 @@ public class Dispatcher {
|
||||||
*/
|
*/
|
||||||
private long getBlockList() throws IOException {
|
private long getBlockList() throws IOException {
|
||||||
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
||||||
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size);
|
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
|
||||||
|
|
||||||
long bytesReceived = 0;
|
long bytesReceived = 0;
|
||||||
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
||||||
|
@ -579,7 +563,7 @@ public class Dispatcher {
|
||||||
final String[] datanodeUuids = blk.getDatanodeUuids();
|
final String[] datanodeUuids = blk.getDatanodeUuids();
|
||||||
final StorageType[] storageTypes = blk.getStorageTypes();
|
final StorageType[] storageTypes = blk.getStorageTypes();
|
||||||
for (int i = 0; i < datanodeUuids.length; i++) {
|
for (int i = 0; i < datanodeUuids.length; i++) {
|
||||||
final BalancerDatanode.StorageGroup g = storageGroupMap.get(
|
final StorageGroup g = storageGroupMap.get(
|
||||||
datanodeUuids[i], storageTypes[i]);
|
datanodeUuids[i], storageTypes[i]);
|
||||||
if (g != null) { // not unknown
|
if (g != null) { // not unknown
|
||||||
block.addLocation(g);
|
block.addLocation(g);
|
||||||
|
@ -617,7 +601,7 @@ public class Dispatcher {
|
||||||
private PendingMove chooseNextMove() {
|
private PendingMove chooseNextMove() {
|
||||||
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
|
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
|
||||||
final Task task = i.next();
|
final Task task = i.next();
|
||||||
final BalancerDatanode target = task.target.getBalancerDatanode();
|
final DDatanode target = task.target.getDDatanode();
|
||||||
PendingMove pendingBlock = new PendingMove();
|
PendingMove pendingBlock = new PendingMove();
|
||||||
if (target.addPendingBlock(pendingBlock)) {
|
if (target.addPendingBlock(pendingBlock)) {
|
||||||
// target is not busy, so do a tentative block allocation
|
// target is not busy, so do a tentative block allocation
|
||||||
|
@ -670,7 +654,7 @@ public class Dispatcher {
|
||||||
final long startTime = Time.monotonicNow();
|
final long startTime = Time.monotonicNow();
|
||||||
this.blocksToReceive = 2 * getScheduledSize();
|
this.blocksToReceive = 2 * getScheduledSize();
|
||||||
boolean isTimeUp = false;
|
boolean isTimeUp = false;
|
||||||
int noPendingBlockIteration = 0;
|
int noPendingMoveIteration = 0;
|
||||||
while (!isTimeUp && getScheduledSize() > 0
|
while (!isTimeUp && getScheduledSize() > 0
|
||||||
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
||||||
final PendingMove p = chooseNextMove();
|
final PendingMove p = chooseNextMove();
|
||||||
|
@ -699,11 +683,11 @@ public class Dispatcher {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// source node cannot find a pendingBlockToMove, iteration +1
|
// source node cannot find a pending block to move, iteration +1
|
||||||
noPendingBlockIteration++;
|
noPendingMoveIteration++;
|
||||||
// in case no blocks can be moved for source node's task,
|
// in case no blocks can be moved for source node's task,
|
||||||
// jump out of while-loop after 5 iterations.
|
// jump out of while-loop after 5 iterations.
|
||||||
if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
|
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
|
||||||
resetScheduledSize();
|
resetScheduledSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -726,29 +710,19 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes,
|
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
Set<String> excludedNodes, Configuration conf) {
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
this.nnc = theblockpool;
|
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
||||||
this.keyManager = nnc.getKeyManager();
|
this.nnc = nnc;
|
||||||
this.excludedNodes = excludedNodes;
|
this.excludedNodes = excludedNodes;
|
||||||
this.includedNodes = includedNodes;
|
this.includedNodes = includedNodes;
|
||||||
|
this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth);
|
||||||
final long movedWinWidth = conf.getLong(
|
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
|
||||||
movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
|
|
||||||
|
|
||||||
this.cluster = NetworkTopology.getInstance(conf);
|
this.cluster = NetworkTopology.getInstance(conf);
|
||||||
|
|
||||||
this.moveExecutor = Executors.newFixedThreadPool(conf.getInt(
|
this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
|
this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
|
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
||||||
this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
|
|
||||||
this.maxConcurrentMovesPerNode = conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
|
||||||
|
|
||||||
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
|
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
|
||||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||||
|
@ -784,7 +758,7 @@ public class Dispatcher {
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
void add(Source source, BalancerDatanode.StorageGroup target) {
|
void add(Source source, StorageGroup target) {
|
||||||
sources.add(source);
|
sources.add(source);
|
||||||
targets.add(target);
|
targets.add(target);
|
||||||
}
|
}
|
||||||
|
@ -826,8 +800,8 @@ public class Dispatcher {
|
||||||
return trimmed;
|
return trimmed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BalancerDatanode newDatanode(DatanodeStorageReport r) {
|
public DDatanode newDatanode(DatanodeStorageReport r) {
|
||||||
return new BalancerDatanode(r, maxConcurrentMovesPerNode);
|
return new DDatanode(r, maxConcurrentMovesPerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean dispatchAndCheckContinue() throws InterruptedException {
|
public boolean dispatchAndCheckContinue() throws InterruptedException {
|
||||||
|
@ -884,8 +858,8 @@ public class Dispatcher {
|
||||||
private void waitForMoveCompletion() {
|
private void waitForMoveCompletion() {
|
||||||
for(;;) {
|
for(;;) {
|
||||||
boolean empty = true;
|
boolean empty = true;
|
||||||
for (BalancerDatanode.StorageGroup t : targets) {
|
for (StorageGroup t : targets) {
|
||||||
if (!t.getBalancerDatanode().isPendingQEmpty()) {
|
if (!t.getDDatanode().isPendingQEmpty()) {
|
||||||
empty = false;
|
empty = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -907,8 +881,8 @@ public class Dispatcher {
|
||||||
* 2. the block does not have a replica on the target;
|
* 2. the block does not have a replica on the target;
|
||||||
* 3. doing the move does not reduce the number of racks that the block has
|
* 3. doing the move does not reduce the number of racks that the block has
|
||||||
*/
|
*/
|
||||||
private boolean isGoodBlockCandidate(Source source,
|
private boolean isGoodBlockCandidate(Source source, StorageGroup target,
|
||||||
BalancerDatanode.StorageGroup target, DBlock block) {
|
DBlock block) {
|
||||||
if (source.storageType != target.storageType) {
|
if (source.storageType != target.storageType) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -933,17 +907,17 @@ public class Dispatcher {
|
||||||
* Determine whether moving the given block replica from source to target
|
* Determine whether moving the given block replica from source to target
|
||||||
* would reduce the number of racks of the block replicas.
|
* would reduce the number of racks of the block replicas.
|
||||||
*/
|
*/
|
||||||
private boolean reduceNumOfRacks(Source source,
|
private boolean reduceNumOfRacks(Source source, StorageGroup target,
|
||||||
BalancerDatanode.StorageGroup target, DBlock block) {
|
DBlock block) {
|
||||||
final DatanodeInfo sourceDn = source.getDatanode();
|
final DatanodeInfo sourceDn = source.getDatanodeInfo();
|
||||||
if (cluster.isOnSameRack(sourceDn, target.getDatanode())) {
|
if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
|
||||||
// source and target are on the same rack
|
// source and target are on the same rack
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
boolean notOnSameRack = true;
|
boolean notOnSameRack = true;
|
||||||
synchronized (block) {
|
synchronized (block) {
|
||||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
for (StorageGroup loc : block.getLocations()) {
|
||||||
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
|
if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) {
|
||||||
notOnSameRack = false;
|
notOnSameRack = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -953,8 +927,8 @@ public class Dispatcher {
|
||||||
// target is not on the same rack as any replica
|
// target is not on the same rack as any replica
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (BalancerDatanode.StorageGroup g : block.getLocations()) {
|
for (StorageGroup g : block.getLocations()) {
|
||||||
if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) {
|
if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
|
||||||
// source is on the same rack of another replica
|
// source is on the same rack of another replica
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -971,10 +945,10 @@ public class Dispatcher {
|
||||||
* group with target
|
* group with target
|
||||||
*/
|
*/
|
||||||
private boolean isOnSameNodeGroupWithReplicas(
|
private boolean isOnSameNodeGroupWithReplicas(
|
||||||
BalancerDatanode.StorageGroup target, DBlock block, Source source) {
|
StorageGroup target, DBlock block, Source source) {
|
||||||
final DatanodeInfo targetDn = target.getDatanode();
|
final DatanodeInfo targetDn = target.getDatanodeInfo();
|
||||||
for (BalancerDatanode.StorageGroup g : block.getLocations()) {
|
for (StorageGroup g : block.getLocations()) {
|
||||||
if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) {
|
if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exit status - The values associated with each exit status is directly mapped
|
||||||
|
* to the process's exit code in command line.
|
||||||
|
*/
|
||||||
|
public enum ExitStatus {
|
||||||
|
SUCCESS(0),
|
||||||
|
IN_PROGRESS(1),
|
||||||
|
ALREADY_RUNNING(-1),
|
||||||
|
NO_MOVE_BLOCK(-2),
|
||||||
|
NO_MOVE_PROGRESS(-3),
|
||||||
|
IO_EXCEPTION(-4),
|
||||||
|
ILLEGAL_ARGUMENTS(-5),
|
||||||
|
INTERRUPTED(-6);
|
||||||
|
|
||||||
|
private final int code;
|
||||||
|
|
||||||
|
private ExitStatus(int code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return the command line exit code. */
|
||||||
|
public int getExitCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
|
@ -570,10 +570,10 @@ public class TestBalancer {
|
||||||
final int r = Balancer.run(namenodes, p, conf);
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
|
||||||
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
|
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
}
|
}
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
LOG.info("Rebalancing with default ctor.");
|
LOG.info("Rebalancing with default ctor.");
|
||||||
|
@ -717,7 +717,7 @@ public class TestBalancer {
|
||||||
Balancer.Parameters.DEFAULT.threshold,
|
Balancer.Parameters.DEFAULT.threshold,
|
||||||
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
|
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
|
||||||
final int r = Balancer.run(namenodes, p, conf);
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class TestBalancerWithHANameNodes {
|
||||||
assertEquals(1, namenodes.size());
|
assertEquals(1, namenodes.size());
|
||||||
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||||
cluster, Balancer.Parameters.DEFAULT);
|
cluster, Balancer.Parameters.DEFAULT);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class TestBalancerWithMultipleNameNodes {
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
|
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
|
||||||
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
|
||||||
LOG.info("BALANCER 2");
|
LOG.info("BALANCER 2");
|
||||||
wait(s.clients, totalUsed, totalCapacity);
|
wait(s.clients, totalUsed, totalCapacity);
|
||||||
|
|
|
@ -176,7 +176,7 @@ public class TestBalancerWithNodeGroup {
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||||
LOG.info("Rebalancing with default factor.");
|
LOG.info("Rebalancing with default factor.");
|
||||||
|
@ -190,8 +190,8 @@ public class TestBalancerWithNodeGroup {
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||||
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
|
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
|
||||||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
|
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||||
LOG.info("Rebalancing with default factor.");
|
LOG.info("Rebalancing with default factor.");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue