HDFS-9543. DiskBalancer: Add Data mover. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-04-28 16:12:04 -07:00 committed by Arpit Agarwal
parent 7820737cfa
commit 1594b472bb
10 changed files with 668 additions and 68 deletions

View File

@ -3324,8 +3324,6 @@ public class DataNode extends ReconfigurableBase
* @param planID - Hash value of the plan.
* @param planVersion - Plan version, reserved for future use. We have only
* version 1 now.
* @param bandwidth - Max disk bandwidth to use, 0 means use value defined
* in the configration.
* @param plan - Actual plan
* @throws IOException
*/

View File

@ -23,7 +23,9 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
.DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -48,18 +52,21 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Worker class for Disk Balancer.
* <p/>
* <p>
* Here is the high level logic executed by this class. Users can submit disk
* balancing plans using submitPlan calls. After a set of sanity checks the plan
* is admitted and put into workMap.
* <p/>
* <p>
* The executePlan launches a thread that picks up work from workMap and hands
* it over to the BlockMover#copyBlocks function.
* <p/>
* <p>
* Constraints :
* <p/>
* <p>
* Only one plan can be executing in a datanode at any given time. This is
* ensured by checking the future handle of the worker thread in submitPlan.
*/
@ -127,11 +134,12 @@ public class DiskBalancer {
* Shutdown the executor.
*/
private void shutdownExecutor() {
final int secondsTowait = 10;
scheduler.shutdown();
try {
if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Scheduler did not terminate.");
}
}
@ -207,6 +215,7 @@ public class DiskBalancer {
/**
* Cancels a running plan.
*
* @param planID - Hash of the plan to cancel.
* @throws DiskBalancerException
*/
@ -604,13 +613,15 @@ public class DiskBalancer {
/**
* Actual DataMover class for DiskBalancer.
* <p/>
* <p>
*/
public static class DiskBalancerMover implements BlockMover {
private final FsDatasetSpi dataset;
private long diskBandwidth;
private long blockTolerance;
private long maxDiskErrors;
private int poolIndex;
private AtomicBoolean shouldRun;
/**
* Constructs diskBalancerMover.
@ -620,6 +631,7 @@ public class DiskBalancer {
*/
public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
this.dataset = dataset;
shouldRun = new AtomicBoolean(false);
this.diskBandwidth = conf.getLong(
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
@ -658,6 +670,222 @@ public class DiskBalancer {
}
}
/**
* Sets Diskmover copyblocks into runnable state.
*/
@Override
public void setRunnable() {
this.shouldRun.set(true);
}
/**
* Signals copy block to exit.
*/
@Override
public void setExitFlag() {
this.shouldRun.set(false);
}
/**
* Returns the shouldRun boolean flag.
*/
public boolean shouldRun() {
return this.shouldRun.get();
}
/**
* Checks if a given block is less than needed size to meet our goal.
*
* @param blockSize - block len
* @param item - Work item
* @return true if this block meets our criteria, false otherwise.
*/
private boolean isLessThanNeeded(long blockSize,
DiskBalancerWorkItem item) {
long bytesToCopy = item.getBytesToCopy() - item.getBytesCopied();
bytesToCopy = bytesToCopy +
((bytesToCopy * getBlockTolerancePercentage(item)) / 100);
return (blockSize <= bytesToCopy) ? true : false;
}
/**
* Returns the default block tolerance if the plan does not have value of
* tolerance specified.
*
* @param item - DiskBalancerWorkItem
* @return long
*/
private long getBlockTolerancePercentage(DiskBalancerWorkItem item) {
return item.getTolerancePercent() <= 0 ? this.blockTolerance :
item.getTolerancePercent();
}
/**
* Inflates bytesCopied and returns true or false. This allows us to stop
* copying if we have reached close enough.
*
* @param item DiskBalancerWorkItem
* @return -- false if we need to copy more, true if we are done
*/
private boolean isCloseEnough(DiskBalancerWorkItem item) {
long temp = item.getBytesCopied() +
((item.getBytesCopied() * getBlockTolerancePercentage(item)) / 100);
return (item.getBytesToCopy() >= temp) ? false : true;
}
/**
* Returns disk bandwidth associated with this plan, if none is specified
* returns the global default.
*
* @param item DiskBalancerWorkItem.
* @return MB/s - long
*/
private long getDiskBandwidth(DiskBalancerWorkItem item) {
return item.getBandwidth() <= 0 ? this.diskBandwidth : item
.getBandwidth();
}
/**
* Computes sleep delay needed based on the block that just got copied. we
* copy using a burst mode, that is we let the copy proceed in full
* throttle. Once a copy is done, we compute how many bytes have been
* transferred and try to average it over the user specified bandwidth. In
* other words, This code implements a poor man's token bucket algorithm for
* traffic shaping.
*
* @param bytesCopied - byteCopied.
* @param timeUsed in milliseconds
* @param item DiskBalancerWorkItem
* @return sleep delay in Milliseconds.
*/
private long computeDelay(long bytesCopied, long timeUsed,
DiskBalancerWorkItem item) {
// we had an overflow, ignore this reading and continue.
if (timeUsed == 0) {
return 0;
}
final int megaByte = 1024 * 1024;
long bytesInMB = bytesCopied / megaByte;
long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
TimeUnit.MILLISECONDS);
long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
}
/**
* Returns maximum errors to tolerate for the specific plan or the default.
*
* @param item - DiskBalancerWorkItem
* @return maximum error counts to tolerate.
*/
private long getMaxError(DiskBalancerWorkItem item) {
return item.getMaxDiskErrors() <= 0 ? this.maxDiskErrors :
item.getMaxDiskErrors();
}
/**
* Gets the next block that we can copy, returns null if we cannot find a
* block that fits our parameters or if have run out of blocks.
*
* @param iter Block Iter
* @param item - Work item
* @return Extended block or null if no copyable block is found.
*/
private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
DiskBalancerWorkItem item) {
while (!iter.atEnd() && item.getErrorCount() < getMaxError(item)) {
try {
ExtendedBlock block = iter.nextBlock();
// A valid block is a finalized block, we iterate until we get
// finalized blocks
if (!this.dataset.isValidBlock(block)) {
continue;
}
// We don't look for the best, we just do first fit
if (isLessThanNeeded(block.getNumBytes(), item)) {
return block;
}
} catch (IOException e) {
item.incErrorCount();
}
}
if (item.getErrorCount() >= getMaxError(item)) {
item.setErrMsg("Error count exceeded.");
LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
, item.getErrorCount(), item.getMaxDiskErrors());
}
return null;
}
/**
* Opens all Block pools on a given volume.
*
* @param source Source
* @param poolIters List of PoolIters to maintain.
*/
private void openPoolIters(FsVolumeSpi source, List<FsVolumeSpi
.BlockIterator> poolIters) {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(poolIters);
for (String blockPoolID : source.getBlockPoolList()) {
poolIters.add(source.newBlockIterator(blockPoolID,
"DiskBalancerSource"));
}
}
/**
* Returns the next block that we copy from all the block pools. This
* function looks across all block pools to find the next block to copy.
*
* @param poolIters - List of BlockIterators
* @return ExtendedBlock.
*/
ExtendedBlock getNextBlock(List<FsVolumeSpi.BlockIterator> poolIters,
DiskBalancerWorkItem item) {
Preconditions.checkNotNull(poolIters);
int currentCount = 0;
ExtendedBlock block = null;
while (block == null && currentCount < poolIters.size()) {
currentCount++;
poolIndex = poolIndex++ % poolIters.size();
FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(poolIndex);
block = getBlockToCopy(currentPoolIter, item);
}
if (block == null) {
try {
item.setErrMsg("No source blocks found to move.");
LOG.error("No movable source blocks found. {}", item.toJson());
} catch (IOException e) {
LOG.error("Unable to get json from Item.");
}
}
return block;
}
/**
* Close all Pool Iters.
*
* @param poolIters List of BlockIters
*/
private void closePoolIters(List<FsVolumeSpi.BlockIterator> poolIters) {
Preconditions.checkNotNull(poolIters);
for (FsVolumeSpi.BlockIterator iter : poolIters) {
try {
iter.close();
} catch (IOException ex) {
LOG.error("Error closing a block pool iter. ex: {}", ex);
}
}
}
/**
* Copies blocks from a set of volumes.
*
@ -666,23 +894,110 @@ public class DiskBalancer {
*/
@Override
public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
FsVolumeSpi source = pair.getSource();
FsVolumeSpi dest = pair.getDest();
List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
if (source.isTransientStorage() || dest.isTransientStorage()) {
return;
}
/**
* Begin the actual copy operations. This is useful in testing.
*/
@Override
public void setRunnable() {
try {
openPoolIters(source, poolIters);
if (poolIters.size() == 0) {
LOG.error("No block pools found on volume. volume : {}. Exiting.",
source.getBasePath());
return;
}
/**
* Tells copyBlocks to exit from the copy routine.
*/
@Override
public void setExitFlag() {
while (shouldRun()) {
try {
// Check for the max error count constraint.
if (item.getErrorCount() > getMaxError(item)) {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBasePath(),
dest.getBasePath(), item.getErrorCount());
this.setExitFlag();
continue;
}
// Check for the block tolerance constraint.
if (isCloseEnough(item)) {
LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
"blocks.",
source.getBasePath(), dest.getBasePath(),
item.getBytesCopied(), item.getBlocksCopied());
this.setExitFlag();
continue;
}
ExtendedBlock block = getNextBlock(poolIters, item);
// we are not able to find any blocks to copy.
if (block == null) {
this.setExitFlag();
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath());
continue;
}
// check if someone told us exit, treat this as an interruption
// point
// for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time.
if (!shouldRun()) {
continue;
}
long timeUsed;
// There is a race condition here, but we will get an IOException
// if dest has no space, which we handle anyway.
if (dest.getAvailable() > item.getBytesToCopy()) {
long begin = System.nanoTime();
this.dataset.moveBlockAcrossVolumes(block, dest);
long now = System.nanoTime();
timeUsed = (now - begin) > 0 ? now - begin : 0;
} else {
// Technically it is possible for us to find a smaller block and
// make another copy, but opting for the safer choice of just
// exiting here.
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
this.setExitFlag();
continue;
}
LOG.debug("Moved block with size {} from {} to {}",
block.getNumBytes(), source.getBasePath(),
dest.getBasePath());
item.incCopiedSoFar(block.getNumBytes());
item.incBlocksCopied();
// Check for the max throughput constraint.
// We sleep here to keep the promise that we will not
// copy more than Max MB/sec. we sleep enough time
// to make sure that our promise is good on average.
// Because we sleep, if a shutdown or cancel call comes in
// we exit via Thread Interrupted exception.
Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
} catch (IOException ex) {
LOG.error("Exception while trying to copy blocks. error: {}", ex);
item.incErrorCount();
} catch (InterruptedException e) {
LOG.error("Copy Block Thread interrupted, exiting the copy.");
Thread.currentThread().interrupt();
item.incErrorCount();
this.setExitFlag();
}
}
} finally {
// Close all Iters.
closePoolIters(poolIters);
}
}
/**

View File

@ -28,7 +28,7 @@ import java.util.Map;
* between a set of Nodes.
*/
public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
private float nodeDataDensity;
private double nodeDataDensity;
private Map<String, DiskBalancerVolumeSet> volumeSets;
private String dataNodeUUID;
private String dataNodeIP;
@ -159,17 +159,17 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
public int compareTo(DiskBalancerDataNode that) {
Preconditions.checkNotNull(that);
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
< 0) {
return -1;
}
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
== 0) {
return 0;
}
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
if (Double.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
> 0) {
return 1;
}
@ -190,7 +190,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
*
* @return float
*/
public float getNodeDataDensity() {
public double getNodeDataDensity() {
return nodeDataDensity;
}
@ -201,7 +201,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
* spread across a set of volumes inside the node.
*/
public void computeNodeDensity() {
float sum = 0;
double sum = 0;
int volcount = 0;
for (DiskBalancerVolumeSet vset : volumeSets.values()) {
for (DiskBalancerVolume vol : vset.getVolumes()) {
@ -249,6 +249,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
vSet = volumeSets.get(volumeSetKey);
} else {
vSet = new DiskBalancerVolumeSet(volume.isTransient());
vSet.setStorageType(volumeSetKey);
volumeSets.put(volumeSetKey, vSet);
}

View File

@ -38,7 +38,7 @@ public class DiskBalancerVolume {
private String uuid;
private boolean failed;
private boolean isTransient;
private float volumeDataDensity;
private double volumeDataDensity;
private boolean skip = false;
private boolean isReadOnly;
@ -69,7 +69,7 @@ public class DiskBalancerVolume {
*
* @return float.
*/
public float getVolumeDataDensity() {
public double getVolumeDataDensity() {
return volumeDataDensity;
}
@ -78,7 +78,7 @@ public class DiskBalancerVolume {
*
* @param volDataDensity - density
*/
public void setVolumeDataDensity(float volDataDensity) {
public void setVolumeDataDensity(double volDataDensity) {
this.volumeDataDensity = volDataDensity;
}

View File

@ -53,7 +53,7 @@ public class DiskBalancerVolumeSet {
private String storageType;
private String setID;
private float idealUsed;
private double idealUsed;
/**
@ -142,19 +142,32 @@ public class DiskBalancerVolumeSet {
}
if (totalCapacity != 0) {
this.idealUsed = totalUsed / (float) totalCapacity;
this.idealUsed = truncateDecimals(totalUsed /
(double) totalCapacity);
}
for (DiskBalancerVolume volume : volumes) {
if (!volume.isFailed() && !volume.isSkip()) {
float dfsUsedRatio =
volume.getUsed() / (float) volume.computeEffectiveCapacity();
double dfsUsedRatio =
truncateDecimals(volume.getUsed() /
(double) volume.computeEffectiveCapacity());
volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
sortedQueue.add(volume);
}
}
}
/**
* Truncate to 4 digits since uncontrolled precision is some times
* counter intitive to what users expect.
* @param value - double.
* @return double.
*/
private double truncateDecimals(double value) {
final int multiplier = 10000;
return (double) ((long) (value * multiplier)) / multiplier;
}
private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
//probably points to some sort of mis-configuration. Log this and skip
// processing this volume.
@ -255,7 +268,7 @@ public class DiskBalancerVolumeSet {
* @return true if balancing is needed false otherwise.
*/
public boolean isBalancingNeeded(float thresholdPercentage) {
float threshold = thresholdPercentage / 100.0f;
double threshold = thresholdPercentage / 100.0d;
if(volumes == null || volumes.size() <= 1) {
// there is nothing we can do with a single volume.
@ -265,7 +278,10 @@ public class DiskBalancerVolumeSet {
for (DiskBalancerVolume vol : volumes) {
boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
Double absDensity =
truncateDecimals(Math.abs(vol.getVolumeDataDensity()));
if ((absDensity > threshold) && notSkip) {
return true;
}
}
@ -306,7 +322,7 @@ public class DiskBalancerVolumeSet {
*/
@JsonIgnore
public float getIdealUsed() {
public double getIdealUsed() {
return this.idealUsed;
}
@ -319,8 +335,8 @@ public class DiskBalancerVolumeSet {
*/
@Override
public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
return Float
.compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
return Double.compare(second.getVolumeDataDensity(),
first.getVolumeDataDensity());
}
}
}

View File

@ -38,7 +38,7 @@ import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
public class MoveStep implements Step {
private DiskBalancerVolume sourceVolume;
private DiskBalancerVolume destinationVolume;
private float idealStorage;
private double idealStorage;
private long bytesToMove;
private String volumeSetID;
@ -55,7 +55,7 @@ public class MoveStep implements Step {
* @param bytesToMove - number of bytes to move
* @param volumeSetID - a diskBalancer generated id.
*/
public MoveStep(DiskBalancerVolume sourceVolume, float idealStorage,
public MoveStep(DiskBalancerVolume sourceVolume, double idealStorage,
DiskBalancerVolume destinationVolume, long bytesToMove,
String volumeSetID) {
this.destinationVolume = destinationVolume;
@ -98,7 +98,7 @@ public class MoveStep implements Step {
* @return float
*/
@Override
public float getIdealStorage() {
public double getIdealStorage() {
return idealStorage;
}
@ -146,7 +146,7 @@ public class MoveStep implements Step {
*
* @param idealStorage - ideal Storage
*/
public void setIdealStorage(float idealStorage) {
public void setIdealStorage(double idealStorage) {
this.idealStorage = idealStorage;
}
@ -199,6 +199,7 @@ public class MoveStep implements Step {
* move operation is aborted.
* @return long.
*/
@Override
public long getMaxDiskErrors() {
return maxDiskErrors;
}
@ -208,6 +209,7 @@ public class MoveStep implements Step {
* step is aborted.
* @param maxDiskErrors - long
*/
@Override
public void setMaxDiskErrors(long maxDiskErrors) {
this.maxDiskErrors = maxDiskErrors;
}
@ -223,6 +225,7 @@ public class MoveStep implements Step {
*
* @return tolerance percentage.
*/
@Override
public long getTolerancePercent() {
return tolerancePercent;
}
@ -231,6 +234,7 @@ public class MoveStep implements Step {
* Sets the tolerance percentage.
* @param tolerancePercent - long
*/
@Override
public void setTolerancePercent(long tolerancePercent) {
this.tolerancePercent = tolerancePercent;
}
@ -241,6 +245,7 @@ public class MoveStep implements Step {
* datanode while data node is in use.
* @return long.
*/
@Override
public long getBandwidth() {
return bandwidth;
}
@ -250,6 +255,7 @@ public class MoveStep implements Step {
* @param bandwidth - Long, MB / Sec of data to be moved between
* source and destinatin volume.
*/
@Override
public void setBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}

View File

@ -42,7 +42,7 @@ public interface Step {
*
* @return idealStorage
*/
float getIdealStorage();
double getIdealStorage();
/**
* Gets Source Volume.
@ -87,5 +87,23 @@ public interface Step {
*/
long getBandwidth();
/**
* Sets Tolerance percent on a specific step.
* @param tolerancePercent - tolerance in percentage.
*/
void setTolerancePercent(long tolerancePercent);
/**
* Set Bandwidth on a specific step.
* @param bandwidth - in MB/s
*/
void setBandwidth(long bandwidth);
/**
* Set maximum errors to tolerate before disk balancer step fails.
* @param maxDiskErrors - error count.
*/
void setMaxDiskErrors(long maxDiskErrors);
}

View File

@ -264,7 +264,8 @@ public class TestBalancer {
}
/* create a file with a length of <code>fileLen</code> */
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
public static void createFile(MiniDFSCluster cluster, Path filePath, long
fileLen,
short replicationFactor, int nnIndex)
throws IOException, InterruptedException, TimeoutException {
FileSystem fs = cluster.getFileSystem(nnIndex);

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestDiskBalancer {
@Test
public void TestDiskBalancerNameNodeConnectivity() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int numDatanodes = 2;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
try {
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster DiskBalancerCluster = new DiskBalancerCluster
(nameNodeConnector);
DiskBalancerCluster.readClusterInfo();
assertEquals(DiskBalancerCluster.getNodes().size(), numDatanodes);
DataNode dnNode = cluster.getDataNodes().get(0);
DiskBalancerDataNode dbDnNode =
DiskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid());
assertEquals(dnNode.getDatanodeUuid(), dbDnNode.getDataNodeUUID());
assertEquals(dnNode.getDatanodeId().getIpAddr(),
dbDnNode.getDataNodeIP());
assertEquals(dnNode.getDatanodeId().getHostName(),
dbDnNode.getDataNodeName());
try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset()
.getFsVolumeReferences()) {
assertEquals(ref.size(), dbDnNode.getVolumeCount());
}
} finally {
cluster.shutdown();
}
}
/**
* This test simulates a real Data node working with DiskBalancer.
*
* Here is the overview of this test.
*
* 1. Write a bunch of blocks and move them to one disk to create imbalance.
* 2. Rewrite the capacity of the disks in DiskBalancer Model so that
* planner will produce a move plan.
* 3. Execute the move plan and wait unitl the plan is done.
* 4. Verify the source disk has blocks now.
*
* @throws Exception
*/
@Test
public void TestDiskBalancerEndToEnd() throws Exception {
Configuration conf = new HdfsConfiguration();
final int DEFAULT_BLOCK_SIZE = 100;
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final int numDatanodes = 1;
final String fileName = "/tmp.txt";
final Path filePath = new Path(fileName);
final int blocks = 100;
final int blocksSize = 1024;
final int fileLen = blocks * blocksSize;
// Write a file and restart the cluster
long [] capacities = new long[]{ DEFAULT_BLOCK_SIZE * 2 * fileLen,
DEFAULT_BLOCK_SIZE * 2 * fileLen };
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.storageCapacities(capacities)
.storageTypes(new StorageType[] {StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
try {
cluster.waitActive();
Random r = new Random();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
numDatanodes - 1);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
// Get the data node and move all data to one disk.
DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
DiskBalancerTestUtil.moveAllDataToDestVolume(
dnNode.getFSDataset(), source, dest);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
}
cluster.restartDataNodes();
cluster.waitActive();
// Start up a disk balancer and read the cluster info.
final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
// Rewrite the capacity in the model to show that disks need
// re-balancing.
setVolumeCapacity(diskBalancerCluster, DEFAULT_BLOCK_SIZE * 2 * fileLen, "DISK");
// Pick a node to process.
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
// Now we must have a plan,since the node is imbalanced and we
// asked the disk balancer to create a plan.
assertTrue(clusterplan.size() == 1);
NodePlan plan = clusterplan.get(0);
plan.setNodeUUID(dnNode.getDatanodeUuid());
plan.setTimeStamp(Time.now());
String planJson = plan.toJson();
String planID = DigestUtils.sha512Hex(planJson);
assertNotNull(plan.getVolumeSetPlans());
assertTrue(plan.getVolumeSetPlans().size() > 0);
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
// Submit the plan and wait till the execution is done.
newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return newDN.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
//verify that it worked.
dnNode = cluster.getDataNodes().get(numDatanodes - 1);
assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
}
// Tolerance
long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
* 10) / 100;
assertTrue(
(DiskBalancerTestUtil.getBlockCount(source) *
DEFAULT_BLOCK_SIZE + delta) >=
plan.getVolumeSetPlans().get(0).getBytesToMove());
} finally {
cluster.shutdown();
}
}
/**
* Sets alll Disks capacity to size specified.
* @param cluster - DiskBalancerCluster
* @param size - new size of the disk
*/
private void setVolumeCapacity(DiskBalancerCluster cluster, long size,
String diskType) {
Preconditions.checkNotNull(cluster);
for(DiskBalancerDataNode node : cluster.getNodes()) {
for (DiskBalancerVolume vol :
node.getVolumeSets().get(diskType).getVolumes()) {
vol.setCapacity(size);
}
node.getVolumeSets().get(diskType).computeVolumeDataDensity();
}
}
}

View File

@ -21,11 +21,9 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
@ -143,9 +141,9 @@ public class TestPlanner {
cluster.readClusterInfo();
Assert.assertEquals(1, cluster.getNodes().size());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
GreedyPlanner planner = new GreedyPlanner(5.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeUUID(),
node.getDataNodePort());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
// We should have only one planned move from
@ -184,8 +182,8 @@ public class TestPlanner {
cluster.readClusterInfo();
Assert.assertEquals(1, cluster.getNodes().size());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
GreedyPlanner planner = new GreedyPlanner(5.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeUUID(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
@ -262,11 +260,10 @@ public class TestPlanner {
assertEquals(2, plan.getVolumeSetPlans().size());
Step step = plan.getVolumeSetPlans().get(0);
assertEquals("volume100", step.getSourceVolume().getPath());
assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
step = plan.getVolumeSetPlans().get(1);
assertEquals("volume100", step.getSourceVolume().getPath());
assertEquals("33.3 G", step.getSizeString(step.getBytesToMove()));
assertTrue(step.getSizeString(step.getBytesToMove()).matches("33.[2|3|4] G"));
}
@Test
@ -318,11 +315,12 @@ public class TestPlanner {
Step step = newPlan.getVolumeSetPlans().get(0);
assertEquals("volume100", step.getSourceVolume().getPath());
assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
step = newPlan.getVolumeSetPlans().get(1);
assertEquals("volume100", step.getSourceVolume().getPath());
assertEquals("18.8 G", step.getSizeString(step.getBytesToMove()));
assertTrue(step.getSizeString(step.getBytesToMove()).matches("18.[6|7|8] G"));
}
@Test
@ -364,7 +362,7 @@ public class TestPlanner {
if (step.getDestinationVolume().getPath().equals("volume0-1")) {
assertEquals("volume100", step.getSourceVolume().getPath());
assertEquals("28.6 G",
assertEquals("28.5 G",
step.getSizeString(step.getBytesToMove()));
}