HDFS-13715:diskbalancer does not work if one of the blockpools are empty on a Federated cluster. Contributed by Bharat Viswanadham
This commit is contained in:
parent
7296b644f7
commit
59a3038bc3
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -77,7 +78,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DiskBalancer {
|
public class DiskBalancer {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
|
@VisibleForTesting
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
|
||||||
.class);
|
.class);
|
||||||
private final FsDatasetSpi<?> dataset;
|
private final FsDatasetSpi<?> dataset;
|
||||||
private final String dataNodeUUID;
|
private final String dataNodeUUID;
|
||||||
|
@ -902,15 +904,19 @@ public class DiskBalancer {
|
||||||
try {
|
try {
|
||||||
ExtendedBlock block = iter.nextBlock();
|
ExtendedBlock block = iter.nextBlock();
|
||||||
|
|
||||||
// A valid block is a finalized block, we iterate until we get
|
if (block != null) {
|
||||||
// finalized blocks
|
// A valid block is a finalized block, we iterate until we get
|
||||||
if (!this.dataset.isValidBlock(block)) {
|
// finalized blocks
|
||||||
continue;
|
if (!this.dataset.isValidBlock(block)) {
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// We don't look for the best, we just do first fit
|
// We don't look for the best, we just do first fit
|
||||||
if (isLessThanNeeded(block.getNumBytes(), item)) {
|
if (isLessThanNeeded(block.getNumBytes(), item)) {
|
||||||
return block;
|
return block;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.info("There are no blocks in the blockPool {}", iter.getBlockPoolId());
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -1126,6 +1132,11 @@ public class DiskBalancer {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
item.incErrorCount();
|
item.incErrorCount();
|
||||||
this.setExitFlag();
|
this.setExitFlag();
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
// Exiting if any run time exceptions.
|
||||||
|
LOG.error("Got an unexpected Runtime Exception {}", ex);
|
||||||
|
item.incErrorCount();
|
||||||
|
this.setExitFlag();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -1927,6 +1927,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
*/
|
*/
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public boolean isValidBlock(ExtendedBlock b) {
|
public boolean isValidBlock(ExtendedBlock b) {
|
||||||
|
// If block passed is null, we should return false.
|
||||||
|
if (b == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return isValid(b, ReplicaState.FINALIZED);
|
return isValid(b, ReplicaState.FINALIZED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1935,6 +1939,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
*/
|
*/
|
||||||
@Override // {@link FsDatasetSpi}
|
@Override // {@link FsDatasetSpi}
|
||||||
public boolean isValidRbw(final ExtendedBlock b) {
|
public boolean isValidRbw(final ExtendedBlock b) {
|
||||||
|
// If block passed is null, we should return false.
|
||||||
|
if (b == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return isValid(b, ReplicaState.RBW);
|
return isValid(b, ReplicaState.RBW);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class TestDiskBalancer {
|
||||||
NodePlan plan = dataMover.generatePlan();
|
NodePlan plan = dataMover.generatePlan();
|
||||||
dataMover.executePlan(plan);
|
dataMover.executePlan(plan);
|
||||||
dataMover.verifyPlanExectionDone();
|
dataMover.verifyPlanExectionDone();
|
||||||
dataMover.verifyAllVolumesHaveData();
|
dataMover.verifyAllVolumesHaveData(true);
|
||||||
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -209,7 +209,7 @@ public class TestDiskBalancer {
|
||||||
NodePlan plan = dataMover.generatePlan();
|
NodePlan plan = dataMover.generatePlan();
|
||||||
dataMover.executePlan(plan);
|
dataMover.executePlan(plan);
|
||||||
dataMover.verifyPlanExectionDone();
|
dataMover.verifyPlanExectionDone();
|
||||||
dataMover.verifyAllVolumesHaveData();
|
dataMover.verifyAllVolumesHaveData(true);
|
||||||
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -217,6 +217,66 @@ public class TestDiskBalancer {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws
|
||||||
|
Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
final int blockCount = 100;
|
||||||
|
final int blockSize = 1024;
|
||||||
|
final int diskCount = 2;
|
||||||
|
final int dataNodeCount = 1;
|
||||||
|
final int dataNodeIndex = 0;
|
||||||
|
final int sourceDiskIndex = 0;
|
||||||
|
final long cap = blockSize * 3L * blockCount;
|
||||||
|
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||||
|
.numDataNodes(dataNodeCount)
|
||||||
|
.storagesPerDatanode(diskCount)
|
||||||
|
.storageCapacities(new long[] {cap, cap})
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
DFSTestUtil.setFederatedConfiguration(cluster, conf);
|
||||||
|
|
||||||
|
final String fileName = "/tmp.txt";
|
||||||
|
final Path filePath = new Path(fileName);
|
||||||
|
long fileLen = blockCount * blockSize;
|
||||||
|
|
||||||
|
|
||||||
|
//Writing data only to one nameservice.
|
||||||
|
FileSystem fs = cluster.getFileSystem(0);
|
||||||
|
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
|
||||||
|
0);
|
||||||
|
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
|
||||||
|
|
||||||
|
|
||||||
|
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||||
|
.captureLogs(DiskBalancer.LOG);
|
||||||
|
|
||||||
|
try {
|
||||||
|
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
|
||||||
|
sourceDiskIndex, conf, blockSize, blockCount);
|
||||||
|
dataMover.moveDataToSourceDisk();
|
||||||
|
NodePlan plan = dataMover.generatePlan();
|
||||||
|
dataMover.executePlan(plan);
|
||||||
|
dataMover.verifyPlanExectionDone();
|
||||||
|
//Because here we have one nameservice empty, don't check
|
||||||
|
// blockPoolCount.
|
||||||
|
dataMover.verifyAllVolumesHaveData(false);
|
||||||
|
} finally {
|
||||||
|
Assert.assertTrue(logCapturer.getOutput().contains("There are no " +
|
||||||
|
"blocks in the blockPool"));
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBalanceDataBetweenMultiplePairsOfVolumes()
|
public void testBalanceDataBetweenMultiplePairsOfVolumes()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -255,7 +315,7 @@ public class TestDiskBalancer {
|
||||||
|
|
||||||
dataMover.executePlan(plan);
|
dataMover.executePlan(plan);
|
||||||
dataMover.verifyPlanExectionDone();
|
dataMover.verifyPlanExectionDone();
|
||||||
dataMover.verifyAllVolumesHaveData();
|
dataMover.verifyAllVolumesHaveData(true);
|
||||||
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -296,7 +356,7 @@ public class TestDiskBalancer {
|
||||||
dataMover.moveDataToSourceDisk();
|
dataMover.moveDataToSourceDisk();
|
||||||
NodePlan plan = dataMover.generatePlan();
|
NodePlan plan = dataMover.generatePlan();
|
||||||
dataMover.executePlanDuringDiskRemove(plan);
|
dataMover.executePlanDuringDiskRemove(plan);
|
||||||
dataMover.verifyAllVolumesHaveData();
|
dataMover.verifyAllVolumesHaveData(true);
|
||||||
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.fail("Unexpected exception: " + e);
|
Assert.fail("Unexpected exception: " + e);
|
||||||
|
@ -651,14 +711,15 @@ public class TestDiskBalancer {
|
||||||
/**
|
/**
|
||||||
* Once diskBalancer is run, all volumes mush has some data.
|
* Once diskBalancer is run, all volumes mush has some data.
|
||||||
*/
|
*/
|
||||||
public void verifyAllVolumesHaveData() throws IOException {
|
public void verifyAllVolumesHaveData(boolean checkblockPoolCount) throws
|
||||||
|
IOException {
|
||||||
node = cluster.getDataNodes().get(dataNodeIndex);
|
node = cluster.getDataNodes().get(dataNodeIndex);
|
||||||
try (FsDatasetSpi.FsVolumeReferences refs =
|
try (FsDatasetSpi.FsVolumeReferences refs =
|
||||||
node.getFSDataset().getFsVolumeReferences()) {
|
node.getFSDataset().getFsVolumeReferences()) {
|
||||||
for (FsVolumeSpi volume : refs) {
|
for (FsVolumeSpi volume : refs) {
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0);
|
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, checkblockPoolCount) > 0);
|
||||||
LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil
|
LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil
|
||||||
.getBlockCount(volume, true));
|
.getBlockCount(volume, checkblockPoolCount));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue