HDFS-13715:diskbalancer does not work if one of the blockpools are empty on a Federated cluster. Contributed by Bharat Viswanadham

(cherry picked from commit 59a3038bc3)
This commit is contained in:
Bharat Viswanadham 2018-07-02 21:43:18 -07:00
parent 177ccbc43f
commit 0d7b811ee6
3 changed files with 96 additions and 16 deletions

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -77,7 +78,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
@InterfaceAudience.Private
public class DiskBalancer {
private static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
@VisibleForTesting
public static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
.class);
private final FsDatasetSpi<?> dataset;
private final String dataNodeUUID;
@ -902,6 +904,7 @@ public class DiskBalancer {
try {
ExtendedBlock block = iter.nextBlock();
if (block != null) {
// A valid block is a finalized block, we iterate until we get
// finalized blocks
if (!this.dataset.isValidBlock(block)) {
@ -912,6 +915,9 @@ public class DiskBalancer {
if (isLessThanNeeded(block.getNumBytes(), item)) {
return block;
}
} else {
LOG.info("There are no blocks in the blockPool {}", iter.getBlockPoolId());
}
} catch (IOException e) {
item.incErrorCount();
@ -1126,6 +1132,11 @@ public class DiskBalancer {
Thread.currentThread().interrupt();
item.incErrorCount();
this.setExitFlag();
} catch (RuntimeException ex) {
// Exiting if any run time exceptions.
LOG.error("Got an unexpected Runtime Exception {}", ex);
item.incErrorCount();
this.setExitFlag();
}
}
} finally {

View File

@ -1927,6 +1927,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public boolean isValidBlock(ExtendedBlock b) {
// If block passed is null, we should return false.
if (b == null) {
return false;
}
return isValid(b, ReplicaState.FINALIZED);
}
@ -1935,6 +1939,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // {@link FsDatasetSpi}
public boolean isValidRbw(final ExtendedBlock b) {
// If block passed is null, we should return false.
if (b == null) {
return false;
}
return isValid(b, ReplicaState.RBW);
}

View File

@ -154,7 +154,7 @@ public class TestDiskBalancer {
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
@ -209,7 +209,7 @@ public class TestDiskBalancer {
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
@ -217,6 +217,66 @@ public class TestDiskBalancer {
}
@Test
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws
Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final int sourceDiskIndex = 0;
final long cap = blockSize * 3L * blockCount;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(dataNodeCount)
.storagesPerDatanode(diskCount)
.storageCapacities(new long[] {cap, cap})
.build();
cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
final String fileName = "/tmp.txt";
final Path filePath = new Path(fileName);
long fileLen = blockCount * blockSize;
//Writing data only to one nameservice.
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
0);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DiskBalancer.LOG);
try {
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
sourceDiskIndex, conf, blockSize, blockCount);
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
//Because here we have one nameservice empty, don't check
// blockPoolCount.
dataMover.verifyAllVolumesHaveData(false);
} finally {
Assert.assertTrue(logCapturer.getOutput().contains("There are no " +
"blocks in the blockPool"));
cluster.shutdown();
}
}
@Test
public void testBalanceDataBetweenMultiplePairsOfVolumes()
throws Exception {
@ -255,7 +315,7 @@ public class TestDiskBalancer {
dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} finally {
cluster.shutdown();
@ -296,7 +356,7 @@ public class TestDiskBalancer {
dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
dataMover.executePlanDuringDiskRemove(plan);
dataMover.verifyAllVolumesHaveData();
dataMover.verifyAllVolumesHaveData(true);
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
} catch (Exception e) {
Assert.fail("Unexpected exception: " + e);
@ -651,14 +711,15 @@ public class TestDiskBalancer {
/**
* Once diskBalancer is run, all volumes mush has some data.
*/
public void verifyAllVolumesHaveData() throws IOException {
public void verifyAllVolumesHaveData(boolean checkblockPoolCount) throws
IOException {
node = cluster.getDataNodes().get(dataNodeIndex);
try (FsDatasetSpi.FsVolumeReferences refs =
node.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi volume : refs) {
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0);
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, checkblockPoolCount) > 0);
LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil
.getBlockCount(volume, true));
.getBlockCount(volume, checkblockPoolCount));
}
}
}