From 03f519a757ce83d76e7fc9f6aadf271e38bb9f6d Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Tue, 27 Sep 2016 21:35:06 -0700 Subject: [PATCH] HDFS-9850. DiskBalancer: Explore removing references to FsVolumeSpi. Contributed by Manoj Govindassamy. --- .../hdfs/server/datanode/DiskBalancer.java | 212 ++++++++++++------ .../server/diskbalancer/TestDiskBalancer.java | 156 +++++++++++++ 2 files changed, 300 insertions(+), 68 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index d853ae945e4..e7e9105e356 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi + .FsVolumeReferences; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.LinkedList; @@ -192,7 +195,30 @@ public class DiskBalancer { } /** - * Returns the Current Work Status of a submitted Plan. + * Get FsVolume by volume UUID. + * @param fsDataset + * @param volUuid + * @return FsVolumeSpi + */ + private static FsVolumeSpi getFsVolume(final FsDatasetSpi fsDataset, + final String volUuid) { + FsVolumeSpi fsVolume = null; + try (FsVolumeReferences volumeReferences = + fsDataset.getFsVolumeReferences()) { + for (int i = 0; i < volumeReferences.size(); i++) { + if (volumeReferences.get(i).getStorageID().equals(volUuid)) { + fsVolume = volumeReferences.get(i); + break; + } + } + } catch (IOException e) { + LOG.warn("Disk Balancer - Error when closing volume references: ", e); + } + return fsVolume; + } + + /** + * Returns the current work status of a previously submitted Plan. * * @return DiskBalancerWorkStatus. * @throws DiskBalancerException @@ -214,8 +240,8 @@ public class DiskBalancer { for (Map.Entry entry : workMap.entrySet()) { DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry( - entry.getKey().getSource().getBasePath(), - entry.getKey().getDest().getBasePath(), + entry.getKey().getSourceVolBasePath(), + entry.getKey().getDestVolBasePath(), entry.getValue()); status.addWorkEntry(workEntry); } @@ -269,12 +295,7 @@ public class DiskBalancer { lock.lock(); try { checkDiskBalancerEnabled(); - Map pathMap = new HashMap<>(); - Map volMap = getStorageIDToVolumeMap(); - for (Map.Entry entry : volMap.entrySet()) { - pathMap.put(entry.getKey(), entry.getValue().getBasePath()); - } - return JsonUtil.toJsonString(pathMap); + return JsonUtil.toJsonString(getStorageIDToVolumeBasePathMap()); } catch (DiskBalancerException ex) { throw ex; } catch (IOException e) { @@ -434,47 +455,52 @@ public class DiskBalancer { // Cleanup any residual work in the map. workMap.clear(); - Map pathMap = getStorageIDToVolumeMap(); + Map storageIDToVolBasePathMap = + getStorageIDToVolumeBasePathMap(); for (Step step : plan.getVolumeSetPlans()) { - String sourceuuid = step.getSourceVolume().getUuid(); - String destinationuuid = step.getDestinationVolume().getUuid(); + String sourceVolUuid = step.getSourceVolume().getUuid(); + String destVolUuid = step.getDestinationVolume().getUuid(); - FsVolumeSpi sourceVol = pathMap.get(sourceuuid); - if (sourceVol == null) { - LOG.error("Disk Balancer - Unable to find source volume. submitPlan " + - "failed."); - throw new DiskBalancerException("Unable to find source volume.", + String sourceVolBasePath = storageIDToVolBasePathMap.get(sourceVolUuid); + if (sourceVolBasePath == null) { + final String errMsg = "Disk Balancer - Unable to find volume: " + + step.getSourceVolume().getPath() + ". SubmitPlan failed."; + LOG.error(errMsg); + throw new DiskBalancerException(errMsg, DiskBalancerException.Result.INVALID_VOLUME); } - FsVolumeSpi destVol = pathMap.get(destinationuuid); - if (destVol == null) { - LOG.error("Disk Balancer - Unable to find destination volume. " + - "submitPlan failed."); - throw new DiskBalancerException("Unable to find destination volume.", + String destVolBasePath = storageIDToVolBasePathMap.get(destVolUuid); + if (destVolBasePath == null) { + final String errMsg = "Disk Balancer - Unable to find volume: " + + step.getDestinationVolume().getPath() + ". SubmitPlan failed."; + LOG.error(errMsg); + throw new DiskBalancerException(errMsg, DiskBalancerException.Result.INVALID_VOLUME); } - createWorkPlan(sourceVol, destVol, step); + VolumePair volumePair = new VolumePair(sourceVolUuid, + sourceVolBasePath, destVolUuid, destVolBasePath); + createWorkPlan(volumePair, step); } } /** - * Returns a path to Volume Map. + * Returns volume UUID to volume base path map. * * @return Map * @throws DiskBalancerException */ - private Map getStorageIDToVolumeMap() + private Map getStorageIDToVolumeBasePathMap() throws DiskBalancerException { - Map pathMap = new HashMap<>(); + Map storageIDToVolBasePathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); - pathMap.put(vol.getStorageID(), vol); + storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath()); } references.close(); } @@ -483,7 +509,7 @@ public class DiskBalancer { throw new DiskBalancerException("Internal error", ex, DiskBalancerException.Result.INTERNAL_ERROR); } - return pathMap; + return storageIDToVolBasePathMap; } /** @@ -513,26 +539,24 @@ public class DiskBalancer { /** * Insert work items to work map. - * - * @param source - Source vol - * @param dest - destination volume - * @param step - Move Step + * @param volumePair - VolumePair + * @param step - Move Step */ - private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest, - Step step) throws DiskBalancerException { - - if (source.getStorageID().equals(dest.getStorageID())) { - LOG.info("Disk Balancer - source & destination volumes are same."); - throw new DiskBalancerException("source and destination volumes are " + - "same.", DiskBalancerException.Result.INVALID_MOVE); + private void createWorkPlan(final VolumePair volumePair, Step step) + throws DiskBalancerException { + if (volumePair.getSourceVolUuid().equals(volumePair.getDestVolUuid())) { + final String errMsg = "Disk Balancer - Source and destination volumes " + + "are same: " + volumePair.getSourceVolUuid(); + LOG.warn(errMsg); + throw new DiskBalancerException(errMsg, + DiskBalancerException.Result.INVALID_MOVE); } - VolumePair pair = new VolumePair(source, dest); long bytesToMove = step.getBytesToMove(); // In case we have a plan with more than - // one line of same + // one line of same VolumePair // we compress that into one work order. - if (workMap.containsKey(pair)) { - bytesToMove += workMap.get(pair).getBytesToCopy(); + if (workMap.containsKey(volumePair)) { + bytesToMove += workMap.get(volumePair).getBytesToCopy(); } DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0); @@ -542,7 +566,7 @@ public class DiskBalancer { work.setBandwidth(step.getBandwidth()); work.setTolerancePercent(step.getTolerancePercent()); work.setMaxDiskErrors(step.getMaxDiskErrors()); - workMap.put(pair, work); + workMap.put(volumePair, work); } /** @@ -591,39 +615,63 @@ public class DiskBalancer { } /** - * Holds references to actual volumes that we will be operating against. + * Holds source and dest volumes UUIDs and their BasePaths + * that disk balancer will be operating against. */ public static class VolumePair { - private final FsVolumeSpi source; - private final FsVolumeSpi dest; + private final String sourceVolUuid; + private final String destVolUuid; + private final String sourceVolBasePath; + private final String destVolBasePath; /** * Constructs a volume pair. - * - * @param source - Source Volume - * @param dest - Destination Volume + * @param sourceVolUuid - Source Volume + * @param sourceVolBasePath - Source Volume Base Path + * @param destVolUuid - Destination Volume + * @param destVolBasePath - Destination Volume Base Path */ - public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) { - this.source = source; - this.dest = dest; + public VolumePair(final String sourceVolUuid, + final String sourceVolBasePath, final String destVolUuid, + final String destVolBasePath) { + this.sourceVolUuid = sourceVolUuid; + this.sourceVolBasePath = sourceVolBasePath; + this.destVolUuid = destVolUuid; + this.destVolBasePath = destVolBasePath; } /** - * gets source volume. + * Gets source volume UUID. * - * @return volume + * @return UUID String */ - public FsVolumeSpi getSource() { - return source; + public String getSourceVolUuid() { + return sourceVolUuid; } /** - * Gets Destination volume. - * - * @return volume. + * Gets source volume base path. + * @return String */ - public FsVolumeSpi getDest() { - return dest; + public String getSourceVolBasePath() { + return sourceVolBasePath; + } + /** + * Gets destination volume UUID. + * + * @return UUID String + */ + public String getDestVolUuid() { + return destVolUuid; + } + + /** + * Gets desitnation volume base path. + * + * @return String + */ + public String getDestVolBasePath() { + return destVolBasePath; } @Override @@ -636,13 +684,21 @@ public class DiskBalancer { } VolumePair that = (VolumePair) o; - return source.equals(that.source) && dest.equals(that.dest); + return sourceVolUuid.equals(that.sourceVolUuid) + && sourceVolBasePath.equals(that.sourceVolBasePath) + && destVolUuid.equals(that.destVolUuid) + && destVolBasePath.equals(that.destVolBasePath); } @Override public int hashCode() { - int result = source.getBasePath().hashCode(); - result = 31 * result + dest.getBasePath().hashCode(); + final int primeNum = 31; + final List volumeStrList = Arrays.asList(sourceVolUuid, + sourceVolBasePath, destVolUuid, destVolBasePath); + int result = 1; + for (String str : volumeStrList) { + result = (result * primeNum) + str.hashCode(); + } return result; } } @@ -932,8 +988,28 @@ public class DiskBalancer { */ @Override public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { - FsVolumeSpi source = pair.getSource(); - FsVolumeSpi dest = pair.getDest(); + String sourceVolUuid = pair.getSourceVolUuid(); + String destVolUuuid = pair.getDestVolUuid(); + + // When any of the DiskBalancerWorkItem volumes are not + // available, return after setting error in item. + FsVolumeSpi source = getFsVolume(this.dataset, sourceVolUuid); + if (source == null) { + final String errMsg = "Disk Balancer - Unable to find source volume: " + + pair.getDestVolBasePath(); + LOG.error(errMsg); + item.setErrMsg(errMsg); + return; + } + FsVolumeSpi dest = getFsVolume(this.dataset, destVolUuuid); + if (dest == null) { + final String errMsg = "Disk Balancer - Unable to find dest volume: " + + pair.getDestVolBasePath(); + LOG.error(errMsg); + item.setErrMsg(errMsg); + return; + } + List poolIters = new LinkedList<>(); startTime = Time.now(); item.setStartTime(startTime); @@ -977,7 +1053,7 @@ public class DiskBalancer { // we are not able to find any blocks to copy. if (block == null) { LOG.error("No source blocks, exiting the copy. Source: {}, " + - "dest:{}", source.getBasePath(), dest.getBasePath()); + "Dest:{}", source.getBasePath(), dest.getBasePath()); this.setExitFlag(); continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index eb15bdc656c..d911e749eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -27,9 +28,15 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancer; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.DiskBalancerMover; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.VolumePair; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; @@ -41,18 +48,30 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; /** * Test Disk Balancer. @@ -185,6 +204,47 @@ public class TestDiskBalancer { } } + /** + * Test disk balancer behavior when one of the disks involved + * in balancing operation is removed after submitting the plan. + * @throws Exception + */ + @Test + public void testDiskBalancerWhenRemovingVolumes() throws Exception { + + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + + final int blockCount = 100; + final int blockSize = 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + + MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount) + .setBlockSize(blockSize) + .setDiskCount(diskCount) + .setNumDatanodes(dataNodeCount) + .setConf(conf) + .build(); + + try { + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + dataMover.executePlanDuringDiskRemove(plan); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } finally { + cluster.shutdown(); + } + } + /** * Sets alll Disks capacity to size specified. * @@ -446,6 +506,102 @@ public class TestDiskBalancer { }, 1000, 100000); } + public void executePlanDuringDiskRemove(NodePlan plan) throws + IOException, TimeoutException, InterruptedException { + CountDownLatch createWorkPlanLatch = new CountDownLatch(1); + CountDownLatch removeDiskLatch = new CountDownLatch(1); + AtomicInteger errorCount = new AtomicInteger(0); + + LOG.info("FSDataSet: " + node.getFSDataset()); + final FsDatasetSpi fsDatasetSpy = Mockito.spy(node.getFSDataset()); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + try { + node.getFSDataset().moveBlockAcrossVolumes( + (ExtendedBlock)invocation.getArguments()[0], + (FsVolumeSpi) invocation.getArguments()[1]); + } catch (Exception e) { + errorCount.incrementAndGet(); + } + return null; + } + }).when(fsDatasetSpy).moveBlockAcrossVolumes( + any(ExtendedBlock.class), any(FsVolumeSpi.class)); + + DiskBalancerMover diskBalancerMover = new DiskBalancerMover( + fsDatasetSpy, conf); + diskBalancerMover.setRunnable(); + + DiskBalancerMover diskBalancerMoverSpy = Mockito.spy(diskBalancerMover); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + createWorkPlanLatch.countDown(); + LOG.info("Waiting for the disk removal!"); + try { + removeDiskLatch.await(); + } catch (InterruptedException e) { + LOG.info("Encountered " + e); + } + LOG.info("Got disk removal notification, resuming copyBlocks!"); + diskBalancerMover.copyBlocks((VolumePair)(invocation + .getArguments()[0]), (DiskBalancerWorkItem)(invocation + .getArguments()[1])); + return null; + } + }).when(diskBalancerMoverSpy).copyBlocks( + any(VolumePair.class), any(DiskBalancerWorkItem.class)); + + DiskBalancer diskBalancer = new DiskBalancer(node.getDatanodeUuid(), + conf, diskBalancerMoverSpy); + + List oldDirs = new ArrayList(node.getConf(). + getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + final String newDirs = oldDirs.get(0); + LOG.info("Reconfigure newDirs:" + newDirs); + Thread reconfigThread = new Thread() { + public void run() { + try { + LOG.info("Waiting for work plan creation!"); + createWorkPlanLatch.await(); + LOG.info("Work plan created. Removing disk!"); + assertThat( + "DN did not update its own config", node. + reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs), + is(node.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); + Thread.sleep(1000); + LOG.info("Removed disk!"); + removeDiskLatch.countDown(); + } catch (ReconfigurationException | InterruptedException e) { + Assert.fail("Unexpected error while reconfiguring: " + e); + } + } + }; + reconfigThread.start(); + + String planJson = plan.toJson(); + String planID = DigestUtils.shaHex(planJson); + diskBalancer.submitPlan(planID, 1, PLAN_FILE, planJson, false); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + LOG.info("Work Status: " + diskBalancer. + queryWorkStatus().toJsonString()); + Result result = diskBalancer.queryWorkStatus().getResult(); + return (result == Result.PLAN_DONE); + } catch (IOException e) { + return false; + } + } + }, 1000, 100000); + + assertTrue("Disk balancer operation hit max errors!", errorCount.get() < + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT); + createWorkPlanLatch.await(); + removeDiskLatch.await(); + } + /** * Verifies the Plan Execution has been done. */