From 59a3038bc3d7913dca3de971026bc32cef536a2d Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 2 Jul 2018 21:43:18 -0700 Subject: [PATCH] HDFS-13715:diskbalancer does not work if one of the blockpools are empty on a Federated cluster. Contributed by Bharat Viswanadham --- .../hdfs/server/datanode/DiskBalancer.java | 29 ++++--- .../fsdataset/impl/FsDatasetImpl.java | 8 ++ .../server/diskbalancer/TestDiskBalancer.java | 75 +++++++++++++++++-- 3 files changed, 96 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 91c3624024d..53db022dbf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -77,7 +78,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; @InterfaceAudience.Private public class DiskBalancer { - private static final Logger LOG = LoggerFactory.getLogger(DiskBalancer + @VisibleForTesting + public static final Logger LOG = LoggerFactory.getLogger(DiskBalancer .class); private final FsDatasetSpi dataset; private final String dataNodeUUID; @@ -902,15 +904,19 @@ public class DiskBalancer { try { ExtendedBlock block = iter.nextBlock(); - // A valid block is a finalized block, we iterate until we get - // finalized blocks - if (!this.dataset.isValidBlock(block)) { - continue; - } + if (block != null) { + // A valid block is a finalized block, we iterate until we get + // finalized blocks + if (!this.dataset.isValidBlock(block)) { + continue; + } - // We don't look for the best, we just do first fit - if (isLessThanNeeded(block.getNumBytes(), item)) { - return block; + // We don't look for the best, we just do first fit + if (isLessThanNeeded(block.getNumBytes(), item)) { + return block; + } + } else { + LOG.info("There are no blocks in the blockPool {}", iter.getBlockPoolId()); } } catch (IOException e) { @@ -1126,6 +1132,11 @@ public class DiskBalancer { Thread.currentThread().interrupt(); item.incErrorCount(); this.setExitFlag(); + } catch (RuntimeException ex) { + // Exiting if any run time exceptions. + LOG.error("Got an unexpected Runtime Exception {}", ex); + item.incErrorCount(); + this.setExitFlag(); } } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 695a421d410..89c278a34c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1927,6 +1927,10 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override // FsDatasetSpi public boolean isValidBlock(ExtendedBlock b) { + // If block passed is null, we should return false. + if (b == null) { + return false; + } return isValid(b, ReplicaState.FINALIZED); } @@ -1935,6 +1939,10 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override // {@link FsDatasetSpi} public boolean isValidRbw(final ExtendedBlock b) { + // If block passed is null, we should return false. + if (b == null) { + return false; + } return isValid(b, ReplicaState.RBW); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index deae6eab7ac..55cc57ed03f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -154,7 +154,7 @@ public class TestDiskBalancer { NodePlan plan = dataMover.generatePlan(); dataMover.executePlan(plan); dataMover.verifyPlanExectionDone(); - dataMover.verifyAllVolumesHaveData(); + dataMover.verifyAllVolumesHaveData(true); dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } finally { cluster.shutdown(); @@ -209,7 +209,7 @@ public class TestDiskBalancer { NodePlan plan = dataMover.generatePlan(); dataMover.executePlan(plan); dataMover.verifyPlanExectionDone(); - dataMover.verifyAllVolumesHaveData(); + dataMover.verifyAllVolumesHaveData(true); dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } finally { cluster.shutdown(); @@ -217,6 +217,66 @@ public class TestDiskBalancer { } + + @Test + public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws + Exception { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + final int blockCount = 100; + final int blockSize = 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + final long cap = blockSize * 3L * blockCount; + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(dataNodeCount) + .storagesPerDatanode(diskCount) + .storageCapacities(new long[] {cap, cap}) + .build(); + cluster.waitActive(); + + DFSTestUtil.setFederatedConfiguration(cluster, conf); + + final String fileName = "/tmp.txt"; + final Path filePath = new Path(fileName); + long fileLen = blockCount * blockSize; + + + //Writing data only to one nameservice. + FileSystem fs = cluster.getFileSystem(0); + TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, + 0); + DFSTestUtil.waitReplication(fs, filePath, (short) 1); + + + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(DiskBalancer.LOG); + + try { + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + //Because here we have one nameservice empty, don't check + // blockPoolCount. + dataMover.verifyAllVolumesHaveData(false); + } finally { + Assert.assertTrue(logCapturer.getOutput().contains("There are no " + + "blocks in the blockPool")); + cluster.shutdown(); + } + + } + @Test public void testBalanceDataBetweenMultiplePairsOfVolumes() throws Exception { @@ -255,7 +315,7 @@ public class TestDiskBalancer { dataMover.executePlan(plan); dataMover.verifyPlanExectionDone(); - dataMover.verifyAllVolumesHaveData(); + dataMover.verifyAllVolumesHaveData(true); dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } finally { cluster.shutdown(); @@ -296,7 +356,7 @@ public class TestDiskBalancer { dataMover.moveDataToSourceDisk(); NodePlan plan = dataMover.generatePlan(); dataMover.executePlanDuringDiskRemove(plan); - dataMover.verifyAllVolumesHaveData(); + dataMover.verifyAllVolumesHaveData(true); dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } catch (Exception e) { Assert.fail("Unexpected exception: " + e); @@ -651,14 +711,15 @@ public class TestDiskBalancer { /** * Once diskBalancer is run, all volumes mush has some data. */ - public void verifyAllVolumesHaveData() throws IOException { + public void verifyAllVolumesHaveData(boolean checkblockPoolCount) throws + IOException { node = cluster.getDataNodes().get(dataNodeIndex); try (FsDatasetSpi.FsVolumeReferences refs = node.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi volume : refs) { - assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0); + assertTrue(DiskBalancerTestUtil.getBlockCount(volume, checkblockPoolCount) > 0); LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil - .getBlockCount(volume, true)); + .getBlockCount(volume, checkblockPoolCount)); } } }