HDFS-9850. DiskBalancer: Explore removing references to FsVolumeSpi. Contributed by Manoj Govindassamy.

This commit is contained in:
Anu Engineer 2016-09-27 21:35:06 -07:00
parent 6437ba18c5
commit 03f519a757
2 changed files with 300 additions and 68 deletions

View File

@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
.FsVolumeReferences;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
@ -192,7 +195,30 @@ public class DiskBalancer {
}
/**
* Returns the Current Work Status of a submitted Plan.
* Get FsVolume by volume UUID.
* @param fsDataset
* @param volUuid
* @return FsVolumeSpi
*/
private static FsVolumeSpi getFsVolume(final FsDatasetSpi<?> fsDataset,
final String volUuid) {
FsVolumeSpi fsVolume = null;
try (FsVolumeReferences volumeReferences =
fsDataset.getFsVolumeReferences()) {
for (int i = 0; i < volumeReferences.size(); i++) {
if (volumeReferences.get(i).getStorageID().equals(volUuid)) {
fsVolume = volumeReferences.get(i);
break;
}
}
} catch (IOException e) {
LOG.warn("Disk Balancer - Error when closing volume references: ", e);
}
return fsVolume;
}
/**
* Returns the current work status of a previously submitted Plan.
*
* @return DiskBalancerWorkStatus.
* @throws DiskBalancerException
@ -214,8 +240,8 @@ public class DiskBalancer {
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
entry.getKey().getSource().getBasePath(),
entry.getKey().getDest().getBasePath(),
entry.getKey().getSourceVolBasePath(),
entry.getKey().getDestVolBasePath(),
entry.getValue());
status.addWorkEntry(workEntry);
}
@ -269,12 +295,7 @@ public class DiskBalancer {
lock.lock();
try {
checkDiskBalancerEnabled();
Map<String, String> pathMap = new HashMap<>();
Map<String, FsVolumeSpi> volMap = getStorageIDToVolumeMap();
for (Map.Entry<String, FsVolumeSpi> entry : volMap.entrySet()) {
pathMap.put(entry.getKey(), entry.getValue().getBasePath());
}
return JsonUtil.toJsonString(pathMap);
return JsonUtil.toJsonString(getStorageIDToVolumeBasePathMap());
} catch (DiskBalancerException ex) {
throw ex;
} catch (IOException e) {
@ -434,47 +455,52 @@ public class DiskBalancer {
// Cleanup any residual work in the map.
workMap.clear();
Map<String, FsVolumeSpi> pathMap = getStorageIDToVolumeMap();
Map<String, String> storageIDToVolBasePathMap =
getStorageIDToVolumeBasePathMap();
for (Step step : plan.getVolumeSetPlans()) {
String sourceuuid = step.getSourceVolume().getUuid();
String destinationuuid = step.getDestinationVolume().getUuid();
String sourceVolUuid = step.getSourceVolume().getUuid();
String destVolUuid = step.getDestinationVolume().getUuid();
FsVolumeSpi sourceVol = pathMap.get(sourceuuid);
if (sourceVol == null) {
LOG.error("Disk Balancer - Unable to find source volume. submitPlan " +
"failed.");
throw new DiskBalancerException("Unable to find source volume.",
String sourceVolBasePath = storageIDToVolBasePathMap.get(sourceVolUuid);
if (sourceVolBasePath == null) {
final String errMsg = "Disk Balancer - Unable to find volume: "
+ step.getSourceVolume().getPath() + ". SubmitPlan failed.";
LOG.error(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_VOLUME);
}
FsVolumeSpi destVol = pathMap.get(destinationuuid);
if (destVol == null) {
LOG.error("Disk Balancer - Unable to find destination volume. " +
"submitPlan failed.");
throw new DiskBalancerException("Unable to find destination volume.",
String destVolBasePath = storageIDToVolBasePathMap.get(destVolUuid);
if (destVolBasePath == null) {
final String errMsg = "Disk Balancer - Unable to find volume: "
+ step.getDestinationVolume().getPath() + ". SubmitPlan failed.";
LOG.error(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_VOLUME);
}
createWorkPlan(sourceVol, destVol, step);
VolumePair volumePair = new VolumePair(sourceVolUuid,
sourceVolBasePath, destVolUuid, destVolBasePath);
createWorkPlan(volumePair, step);
}
}
/**
* Returns a path to Volume Map.
* Returns volume UUID to volume base path map.
*
* @return Map
* @throws DiskBalancerException
*/
private Map<String, FsVolumeSpi> getStorageIDToVolumeMap()
private Map<String, String> getStorageIDToVolumeBasePathMap()
throws DiskBalancerException {
Map<String, FsVolumeSpi> pathMap = new HashMap<>();
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
pathMap.put(vol.getStorageID(), vol);
storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath());
}
references.close();
}
@ -483,7 +509,7 @@ public class DiskBalancer {
throw new DiskBalancerException("Internal error", ex,
DiskBalancerException.Result.INTERNAL_ERROR);
}
return pathMap;
return storageIDToVolBasePathMap;
}
/**
@ -513,26 +539,24 @@ public class DiskBalancer {
/**
* Insert work items to work map.
*
* @param source - Source vol
* @param dest - destination volume
* @param volumePair - VolumePair
* @param step - Move Step
*/
private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
Step step) throws DiskBalancerException {
if (source.getStorageID().equals(dest.getStorageID())) {
LOG.info("Disk Balancer - source & destination volumes are same.");
throw new DiskBalancerException("source and destination volumes are " +
"same.", DiskBalancerException.Result.INVALID_MOVE);
private void createWorkPlan(final VolumePair volumePair, Step step)
throws DiskBalancerException {
if (volumePair.getSourceVolUuid().equals(volumePair.getDestVolUuid())) {
final String errMsg = "Disk Balancer - Source and destination volumes " +
"are same: " + volumePair.getSourceVolUuid();
LOG.warn(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_MOVE);
}
VolumePair pair = new VolumePair(source, dest);
long bytesToMove = step.getBytesToMove();
// In case we have a plan with more than
// one line of same <source, dest>
// one line of same VolumePair
// we compress that into one work order.
if (workMap.containsKey(pair)) {
bytesToMove += workMap.get(pair).getBytesToCopy();
if (workMap.containsKey(volumePair)) {
bytesToMove += workMap.get(volumePair).getBytesToCopy();
}
DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
@ -542,7 +566,7 @@ public class DiskBalancer {
work.setBandwidth(step.getBandwidth());
work.setTolerancePercent(step.getTolerancePercent());
work.setMaxDiskErrors(step.getMaxDiskErrors());
workMap.put(pair, work);
workMap.put(volumePair, work);
}
/**
@ -591,39 +615,63 @@ public class DiskBalancer {
}
/**
* Holds references to actual volumes that we will be operating against.
* Holds source and dest volumes UUIDs and their BasePaths
* that disk balancer will be operating against.
*/
public static class VolumePair {
private final FsVolumeSpi source;
private final FsVolumeSpi dest;
private final String sourceVolUuid;
private final String destVolUuid;
private final String sourceVolBasePath;
private final String destVolBasePath;
/**
* Constructs a volume pair.
*
* @param source - Source Volume
* @param dest - Destination Volume
* @param sourceVolUuid - Source Volume
* @param sourceVolBasePath - Source Volume Base Path
* @param destVolUuid - Destination Volume
* @param destVolBasePath - Destination Volume Base Path
*/
public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) {
this.source = source;
this.dest = dest;
public VolumePair(final String sourceVolUuid,
final String sourceVolBasePath, final String destVolUuid,
final String destVolBasePath) {
this.sourceVolUuid = sourceVolUuid;
this.sourceVolBasePath = sourceVolBasePath;
this.destVolUuid = destVolUuid;
this.destVolBasePath = destVolBasePath;
}
/**
* gets source volume.
* Gets source volume UUID.
*
* @return volume
* @return UUID String
*/
public FsVolumeSpi getSource() {
return source;
public String getSourceVolUuid() {
return sourceVolUuid;
}
/**
* Gets Destination volume.
*
* @return volume.
* Gets source volume base path.
* @return String
*/
public FsVolumeSpi getDest() {
return dest;
public String getSourceVolBasePath() {
return sourceVolBasePath;
}
/**
* Gets destination volume UUID.
*
* @return UUID String
*/
public String getDestVolUuid() {
return destVolUuid;
}
/**
* Gets desitnation volume base path.
*
* @return String
*/
public String getDestVolBasePath() {
return destVolBasePath;
}
@Override
@ -636,13 +684,21 @@ public class DiskBalancer {
}
VolumePair that = (VolumePair) o;
return source.equals(that.source) && dest.equals(that.dest);
return sourceVolUuid.equals(that.sourceVolUuid)
&& sourceVolBasePath.equals(that.sourceVolBasePath)
&& destVolUuid.equals(that.destVolUuid)
&& destVolBasePath.equals(that.destVolBasePath);
}
@Override
public int hashCode() {
int result = source.getBasePath().hashCode();
result = 31 * result + dest.getBasePath().hashCode();
final int primeNum = 31;
final List<String> volumeStrList = Arrays.asList(sourceVolUuid,
sourceVolBasePath, destVolUuid, destVolBasePath);
int result = 1;
for (String str : volumeStrList) {
result = (result * primeNum) + str.hashCode();
}
return result;
}
}
@ -932,8 +988,28 @@ public class DiskBalancer {
*/
@Override
public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
FsVolumeSpi source = pair.getSource();
FsVolumeSpi dest = pair.getDest();
String sourceVolUuid = pair.getSourceVolUuid();
String destVolUuuid = pair.getDestVolUuid();
// When any of the DiskBalancerWorkItem volumes are not
// available, return after setting error in item.
FsVolumeSpi source = getFsVolume(this.dataset, sourceVolUuid);
if (source == null) {
final String errMsg = "Disk Balancer - Unable to find source volume: "
+ pair.getDestVolBasePath();
LOG.error(errMsg);
item.setErrMsg(errMsg);
return;
}
FsVolumeSpi dest = getFsVolume(this.dataset, destVolUuuid);
if (dest == null) {
final String errMsg = "Disk Balancer - Unable to find dest volume: "
+ pair.getDestVolBasePath();
LOG.error(errMsg);
item.setErrMsg(errMsg);
return;
}
List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
startTime = Time.now();
item.setStartTime(startTime);
@ -977,7 +1053,7 @@ public class DiskBalancer {
// we are not able to find any blocks to copy.
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath());
"Dest:{}", source.getBasePath(), dest.getBasePath());
this.setExitFlag();
continue;
}

View File

@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@ -27,9 +28,15 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.DiskBalancerMover;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.VolumePair;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
@ -41,18 +48,30 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
/**
* Test Disk Balancer.
@ -185,6 +204,47 @@ public class TestDiskBalancer {
}
}
/**
* Test disk balancer behavior when one of the disks involved
* in balancing operation is removed after submitting the plan.
* @throws Exception
*/
@Test
public void testDiskBalancerWhenRemovingVolumes() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount)
.setBlockSize(blockSize)
.setDiskCount(diskCount)
.setNumDatanodes(dataNodeCount)
.setConf(conf)
.build();
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlanDuringDiskRemove(plan);
dataMover.verifyAllVolumesHaveData();
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} catch (Exception e) {
Assert.fail("Unexpected exception: " + e);
} finally {
cluster.shutdown();
}
}
/**
* Sets alll Disks capacity to size specified.
*
@ -446,6 +506,102 @@ public class TestDiskBalancer {
}, 1000, 100000);
}
public void executePlanDuringDiskRemove(NodePlan plan) throws
IOException, TimeoutException, InterruptedException {
CountDownLatch createWorkPlanLatch = new CountDownLatch(1);
CountDownLatch removeDiskLatch = new CountDownLatch(1);
AtomicInteger errorCount = new AtomicInteger(0);
LOG.info("FSDataSet: " + node.getFSDataset());
final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
try {
node.getFSDataset().moveBlockAcrossVolumes(
(ExtendedBlock)invocation.getArguments()[0],
(FsVolumeSpi) invocation.getArguments()[1]);
} catch (Exception e) {
errorCount.incrementAndGet();
}
return null;
}
}).when(fsDatasetSpy).moveBlockAcrossVolumes(
any(ExtendedBlock.class), any(FsVolumeSpi.class));
DiskBalancerMover diskBalancerMover = new DiskBalancerMover(
fsDatasetSpy, conf);
diskBalancerMover.setRunnable();
DiskBalancerMover diskBalancerMoverSpy = Mockito.spy(diskBalancerMover);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
createWorkPlanLatch.countDown();
LOG.info("Waiting for the disk removal!");
try {
removeDiskLatch.await();
} catch (InterruptedException e) {
LOG.info("Encountered " + e);
}
LOG.info("Got disk removal notification, resuming copyBlocks!");
diskBalancerMover.copyBlocks((VolumePair)(invocation
.getArguments()[0]), (DiskBalancerWorkItem)(invocation
.getArguments()[1]));
return null;
}
}).when(diskBalancerMoverSpy).copyBlocks(
any(VolumePair.class), any(DiskBalancerWorkItem.class));
DiskBalancer diskBalancer = new DiskBalancer(node.getDatanodeUuid(),
conf, diskBalancerMoverSpy);
List<String> oldDirs = new ArrayList<String>(node.getConf().
getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
final String newDirs = oldDirs.get(0);
LOG.info("Reconfigure newDirs:" + newDirs);
Thread reconfigThread = new Thread() {
public void run() {
try {
LOG.info("Waiting for work plan creation!");
createWorkPlanLatch.await();
LOG.info("Work plan created. Removing disk!");
assertThat(
"DN did not update its own config", node.
reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
is(node.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
Thread.sleep(1000);
LOG.info("Removed disk!");
removeDiskLatch.countDown();
} catch (ReconfigurationException | InterruptedException e) {
Assert.fail("Unexpected error while reconfiguring: " + e);
}
}
};
reconfigThread.start();
String planJson = plan.toJson();
String planID = DigestUtils.shaHex(planJson);
diskBalancer.submitPlan(planID, 1, PLAN_FILE, planJson, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
LOG.info("Work Status: " + diskBalancer.
queryWorkStatus().toJsonString());
Result result = diskBalancer.queryWorkStatus().getResult();
return (result == Result.PLAN_DONE);
} catch (IOException e) {
return false;
}
}
}, 1000, 100000);
assertTrue("Disk balancer operation hit max errors!", errorCount.get() <
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
createWorkPlanLatch.await();
removeDiskLatch.await();
}
/**
* Verifies the Plan Execution has been done.
*/