HDFS-11279. Cleanup unused DataNode#checkDiskErrorAsync(). Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-01-16 16:37:38 -08:00
parent eafaddca1a
commit 8ec9dca2e1
8 changed files with 25 additions and 168 deletions

View File

@ -2021,29 +2021,6 @@ public void shutdown() {
tracer.close(); tracer.close();
} }
/**
* Check if there is a disk failure asynchronously
* and if so, handle the error.
*/
@VisibleForTesting
public void checkDiskErrorAsync() {
volumeChecker.checkAllVolumesAsync(
data, new DatasetVolumeChecker.Callback() {
@Override
public void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
if (failedVolumes.size() > 0) {
LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
failedVolumes.size(), failedVolumes);
} else {
LOG.debug("checkDiskErrorAsync: no volume failures detected");
}
lastDiskErrorCheck = Time.monotonicNow();
handleVolumeFailures(failedVolumes);
}
});
}
/** /**
* Check if there is a disk failure asynchronously * Check if there is a disk failure asynchronously
* and if so, handle the error. * and if so, handle the error.

View File

@ -232,68 +232,6 @@ public void call(Set<FsVolumeSpi> ignored1,
} }
} }
/**
* Start checks against all volumes of a dataset, invoking the
* given callback when the operation has completed. The function
* does not wait for the checks to complete.
*
* If a volume cannot be referenced then it is already closed and
* cannot be checked. No error is propagated to the callback for that
* volume.
*
* @param dataset - FsDatasetSpi to be checked.
* @param callback - Callback to be invoked when the checks are complete.
* @return true if the check was scheduled and the callback will be invoked.
* false if the check was not scheduled and the callback will not be
* invoked.
*/
public boolean checkAllVolumesAsync(
final FsDatasetSpi<? extends FsVolumeSpi> dataset,
Callback callback) {
final long gap = timer.monotonicNow() - lastAllVolumesCheck;
if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
LOG.trace(
"Skipped checking all volumes, time since last check {} is less " +
"than the minimum gap between checks ({} ms).",
gap, minDiskCheckGapMs);
return false;
}
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
if (references.size() == 0) {
LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
return false;
}
lastAllVolumesCheck = timer.monotonicNow();
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final AtomicLong numVolumes = new AtomicLong(references.size());
boolean added = false;
LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
// The context parameter is currently ignored.
Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
if (olf.isPresent()) {
added = true;
Futures.addCallback(olf.get(),
new ResultHandler(reference, healthyVolumes, failedVolumes,
numVolumes, callback));
} else {
IOUtils.cleanup(null, reference);
numVolumes.decrementAndGet();
}
}
numAsyncDatasetChecks.incrementAndGet();
return added;
}
/** /**
* A callback interface that is supplied the result of running an * A callback interface that is supplied the result of running an
* async disk check on multiple volumes. * async disk check on multiple volumes.
@ -489,13 +427,6 @@ public long getNumSyncDatasetChecks() {
return numSyncDatasetChecks.get(); return numSyncDatasetChecks.get();
} }
/**
* Return the number of {@link #checkAllVolumesAsync} invocations.
*/
public long getNumAsyncDatasetChecks() {
return numAsyncDatasetChecks.get();
}
/** /**
* Return the number of checks skipped because the minimum gap since the * Return the number of checks skipped because the minimum gap since the
* last check had not elapsed. * last check had not elapsed.

View File

@ -30,7 +30,9 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@ -214,4 +216,16 @@ public static void reconfigureDataNode(DataNode dn, File... newVols)
LOG.warn("Could not reconfigure DataNode.", e); LOG.warn("Could not reconfigure DataNode.", e);
} }
} }
/** Get the FsVolume on the given basePath. */
public static FsVolumeImpl getVolume(DataNode dn, File basePath) throws
IOException {
try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
.getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
return (FsVolumeImpl) vol;
}
}
return null;
}
} }

View File

@ -900,20 +900,6 @@ public void testAddBackRemovedVolume()
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
} }
/** Get the FsVolume on the given basePath */
private FsVolumeImpl getVolume(DataNode dn, File basePath)
throws IOException {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
if (vol.getBasePath().equals(basePath.getPath())) {
return (FsVolumeImpl) vol;
}
}
}
return null;
}
/** /**
* Verify that {@link DataNode#checkDiskError()} removes all metadata in * Verify that {@link DataNode#checkDiskError()} removes all metadata in
* DataNode upon a volume failure. Thus we can run reconfig on the same * DataNode upon a volume failure. Thus we can run reconfig on the same
@ -934,7 +920,7 @@ public void testDirectlyReloadAfterCheckDiskError()
final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY); final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY);
File dirToFail = new File(cluster.getDataDirectory(), "data1"); File dirToFail = new File(cluster.getDataDirectory(), "data1");
FsVolumeImpl failedVolume = getVolume(dn, dirToFail); FsVolumeImpl failedVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
assertTrue("No FsVolume was found for " + dirToFail, assertTrue("No FsVolume was found for " + dirToFail,
failedVolume != null); failedVolume != null);
long used = failedVolume.getDfsUsed(); long used = failedVolume.getDfsUsed();
@ -957,7 +943,7 @@ public void testDirectlyReloadAfterCheckDiskError()
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
createFile(new Path("/test2"), 32, (short)2); createFile(new Path("/test2"), 32, (short)2);
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail); FsVolumeImpl restoredVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
assertTrue(restoredVolume != null); assertTrue(restoredVolume != null);
assertTrue(restoredVolume != failedVolume); assertTrue(restoredVolume != failedVolume);
// More data has been written to this volume. // More data has been written to this volume.

View File

@ -239,7 +239,7 @@ public void testFailedVolumeBeingRemovedFromDataNode()
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1); DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
DataNode dn0 = cluster.getDataNodes().get(0); DataNode dn0 = cluster.getDataNodes().get(0);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
// Verify dn0Vol1 has been completely removed from DN0. // Verify dn0Vol1 has been completely removed from DN0.
// 1. dn0Vol1 is removed from DataStorage. // 1. dn0Vol1 is removed from DataStorage.
@ -285,10 +285,10 @@ public void testFailedVolumeBeingRemovedFromDataNode()
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath())); assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
} }
private static void checkDiskErrorSync(DataNode dn) private static void checkDiskErrorSync(DataNode dn, FsVolumeSpi volume)
throws InterruptedException { throws InterruptedException {
final long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync(); dn.checkDiskErrorAsync(volume);
// Wait 10 seconds for checkDiskError thread to finish and discover volume // Wait 10 seconds for checkDiskError thread to finish and discover volume
// failures. // failures.
int count = 100; int count = 100;
@ -312,7 +312,8 @@ public void testDataNodeShutdownAfterNumFailedVolumeExceedsTolerated()
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2); DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2);
DataNode dn0 = cluster.getDataNodes().get(0); DataNode dn0 = cluster.getDataNodes().get(0);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
// DN0 should stop after the number of failure disks exceed tolerated // DN0 should stop after the number of failure disks exceed tolerated
// value (1). // value (1).
@ -333,7 +334,7 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
// Fail dn0Vol1 first. // Fail dn0Vol1 first.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1); DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
// Hot swap out the failure volume. // Hot swap out the failure volume.
String dataDirs = dn0Vol2.getPath(); String dataDirs = dn0Vol2.getPath();
@ -352,7 +353,7 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient // Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
// resources, thus it should keep running. // resources, thus it should keep running.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2); DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
assertTrue(dn0.shouldRun()); assertTrue(dn0.shouldRun());
} }
@ -379,12 +380,12 @@ public void testTolerateVolumeFailuresAfterAddingMoreVolumes()
// Fail dn0Vol1 first and hot swap it. // Fail dn0Vol1 first and hot swap it.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1); DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
assertTrue(dn0.shouldRun()); assertTrue(dn0.shouldRun());
// Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure. // Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2); DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0); checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
assertFalse(dn0.shouldRun()); assertFalse(dn0.shouldRun());
} }

View File

@ -17,9 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;

View File

@ -54,8 +54,6 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;

View File

@ -160,53 +160,6 @@ public void testCheckAllVolumes() throws Exception {
} }
} }
/**
* Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}.
*
* @throws Exception
*/
@Test(timeout=10000)
public void testCheckAllVolumesAsync() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final List<FsVolumeSpi> volumes = makeVolumes(
NUM_VOLUMES, expectedVolumeHealth);
final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
boolean result = checker.checkAllVolumesAsync(
dataset, new DatasetVolumeChecker.Callback() {
@Override
public void call(
Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null ||
expectedVolumeHealth == FAILED) {
assertThat(healthyVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(NUM_VOLUMES));
} else {
assertThat(healthyVolumes.size(), is(NUM_VOLUMES));
assertThat(failedVolumes.size(), is(0));
}
numCallbackInvocations.incrementAndGet();
}
});
// The callback should be invoked exactly once.
if (result) {
assertThat(numCallbackInvocations.get(), is(1L));
}
// Ensure each volume's check() method was called exactly once.
for (FsVolumeSpi volume : volumes) {
verify(volume, times(1)).check(any(VolumeCheckContext.class));
}
}
/** /**
* A checker to wraps the result of {@link FsVolumeSpi#check} in * A checker to wraps the result of {@link FsVolumeSpi#check} in
* an ImmediateFuture. * an ImmediateFuture.