HDFS-11279. Cleanup unused DataNode#checkDiskErrorAsync(). Contributed by Hanisha Koneru
This commit is contained in:
parent
8fadd69047
commit
87bb1c49bb
|
@ -2047,25 +2047,6 @@ public class DataNode extends ReconfigurableBase
|
|||
tracer.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there is a disk failure asynchronously
|
||||
* and if so, handle the error.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void checkDiskErrorAsync() {
|
||||
volumeChecker.checkAllVolumesAsync(
|
||||
data, (healthyVolumes, failedVolumes) -> {
|
||||
if (failedVolumes.size() > 0) {
|
||||
LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
|
||||
failedVolumes.size(), failedVolumes);
|
||||
} else {
|
||||
LOG.debug("checkDiskErrorAsync: no volume failures detected");
|
||||
}
|
||||
lastDiskErrorCheck = Time.monotonicNow();
|
||||
handleVolumeFailures(failedVolumes);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there is a disk failure asynchronously
|
||||
* and if so, handle the error.
|
||||
|
|
|
@ -232,68 +232,6 @@ public class DatasetVolumeChecker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start checks against all volumes of a dataset, invoking the
|
||||
* given callback when the operation has completed. The function
|
||||
* does not wait for the checks to complete.
|
||||
*
|
||||
* If a volume cannot be referenced then it is already closed and
|
||||
* cannot be checked. No error is propagated to the callback for that
|
||||
* volume.
|
||||
*
|
||||
* @param dataset - FsDatasetSpi to be checked.
|
||||
* @param callback - Callback to be invoked when the checks are complete.
|
||||
* @return true if the check was scheduled and the callback will be invoked.
|
||||
* false if the check was not scheduled and the callback will not be
|
||||
* invoked.
|
||||
*/
|
||||
public boolean checkAllVolumesAsync(
|
||||
final FsDatasetSpi<? extends FsVolumeSpi> dataset,
|
||||
Callback callback) {
|
||||
final long gap = timer.monotonicNow() - lastAllVolumesCheck;
|
||||
if (gap < minDiskCheckGapMs) {
|
||||
numSkippedChecks.incrementAndGet();
|
||||
LOG.trace(
|
||||
"Skipped checking all volumes, time since last check {} is less " +
|
||||
"than the minimum gap between checks ({} ms).",
|
||||
gap, minDiskCheckGapMs);
|
||||
return false;
|
||||
}
|
||||
|
||||
final FsDatasetSpi.FsVolumeReferences references =
|
||||
dataset.getFsVolumeReferences();
|
||||
|
||||
if (references.size() == 0) {
|
||||
LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
|
||||
return false;
|
||||
}
|
||||
|
||||
lastAllVolumesCheck = timer.monotonicNow();
|
||||
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
|
||||
final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
|
||||
final AtomicLong numVolumes = new AtomicLong(references.size());
|
||||
boolean added = false;
|
||||
|
||||
LOG.info("Checking {} volumes", references.size());
|
||||
for (int i = 0; i < references.size(); ++i) {
|
||||
final FsVolumeReference reference = references.getReference(i);
|
||||
// The context parameter is currently ignored.
|
||||
Optional<ListenableFuture<VolumeCheckResult>> olf =
|
||||
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
|
||||
if (olf.isPresent()) {
|
||||
added = true;
|
||||
Futures.addCallback(olf.get(),
|
||||
new ResultHandler(reference, healthyVolumes, failedVolumes,
|
||||
numVolumes, callback));
|
||||
} else {
|
||||
IOUtils.cleanup(null, reference);
|
||||
numVolumes.decrementAndGet();
|
||||
}
|
||||
}
|
||||
numAsyncDatasetChecks.incrementAndGet();
|
||||
return added;
|
||||
}
|
||||
|
||||
/**
|
||||
* A callback interface that is supplied the result of running an
|
||||
* async disk check on multiple volumes.
|
||||
|
@ -488,13 +426,6 @@ public class DatasetVolumeChecker {
|
|||
return numSyncDatasetChecks.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of {@link #checkAllVolumesAsync} invocations.
|
||||
*/
|
||||
public long getNumAsyncDatasetChecks() {
|
||||
return numAsyncDatasetChecks.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of checks skipped because the minimum gap since the
|
||||
* last check had not elapsed.
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -242,4 +243,18 @@ public class DataNodeTestUtils {
|
|||
LOG.warn("Could not reconfigure DataNode.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the FsVolume on the given basePath. */
|
||||
public static FsVolumeImpl getVolume(DataNode dn, File basePath) throws
|
||||
IOException {
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
|
||||
.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
if (vol.getBaseURI().equals(basePath.toURI())) {
|
||||
return (FsVolumeImpl) vol;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -899,20 +899,6 @@ public class TestDataNodeHotSwapVolumes {
|
|||
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||
}
|
||||
|
||||
/** Get the FsVolume on the given basePath */
|
||||
private FsVolumeImpl getVolume(DataNode dn, File basePath)
|
||||
throws IOException {
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dn.getFSDataset().getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
if (vol.getBaseURI().equals(basePath.toURI())) {
|
||||
return (FsVolumeImpl) vol;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that {@link DataNode#checkDiskError()} removes all metadata in
|
||||
* DataNode upon a volume failure. Thus we can run reconfig on the same
|
||||
|
@ -933,7 +919,7 @@ public class TestDataNodeHotSwapVolumes {
|
|||
final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY);
|
||||
File dirToFail = new File(cluster.getDataDirectory(), "data1");
|
||||
|
||||
FsVolumeImpl failedVolume = getVolume(dn, dirToFail);
|
||||
FsVolumeImpl failedVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
|
||||
assertTrue("No FsVolume was found for " + dirToFail,
|
||||
failedVolume != null);
|
||||
long used = failedVolume.getDfsUsed();
|
||||
|
@ -957,7 +943,7 @@ public class TestDataNodeHotSwapVolumes {
|
|||
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||
|
||||
createFile(new Path("/test2"), 32, (short)2);
|
||||
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
|
||||
FsVolumeImpl restoredVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
|
||||
assertTrue(restoredVolume != null);
|
||||
assertTrue(restoredVolume != failedVolume);
|
||||
// More data has been written to this volume.
|
||||
|
|
|
@ -237,7 +237,7 @@ public class TestDataNodeVolumeFailure {
|
|||
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
|
||||
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
|
||||
|
||||
// Verify dn0Vol1 has been completely removed from DN0.
|
||||
// 1. dn0Vol1 is removed from DataStorage.
|
||||
|
@ -284,10 +284,10 @@ public class TestDataNodeVolumeFailure {
|
|||
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
|
||||
}
|
||||
|
||||
private static void checkDiskErrorSync(DataNode dn)
|
||||
private static void checkDiskErrorSync(DataNode dn, FsVolumeSpi volume)
|
||||
throws InterruptedException {
|
||||
final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
|
||||
dn.checkDiskErrorAsync();
|
||||
dn.checkDiskErrorAsync(volume);
|
||||
// Wait 10 seconds for checkDiskError thread to finish and discover volume
|
||||
// failures.
|
||||
int count = 100;
|
||||
|
@ -311,7 +311,8 @@ public class TestDataNodeVolumeFailure {
|
|||
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2);
|
||||
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
|
||||
|
||||
// DN0 should stop after the number of failure disks exceed tolerated
|
||||
// value (1).
|
||||
|
@ -332,7 +333,7 @@ public class TestDataNodeVolumeFailure {
|
|||
|
||||
// Fail dn0Vol1 first.
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
|
||||
|
||||
// Hot swap out the failure volume.
|
||||
String dataDirs = dn0Vol2.getPath();
|
||||
|
@ -351,7 +352,7 @@ public class TestDataNodeVolumeFailure {
|
|||
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
|
||||
// resources, thus it should keep running.
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
|
||||
assertTrue(dn0.shouldRun());
|
||||
}
|
||||
|
||||
|
@ -378,12 +379,12 @@ public class TestDataNodeVolumeFailure {
|
|||
|
||||
// Fail dn0Vol1 first and hot swap it.
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
|
||||
assertTrue(dn0.shouldRun());
|
||||
|
||||
// Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure.
|
||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
|
||||
checkDiskErrorSync(dn0);
|
||||
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
|
||||
assertFalse(dn0.shouldRun());
|
||||
}
|
||||
|
||||
|
|
|
@ -161,53 +161,6 @@ public class TestDatasetVolumeChecker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testCheckAllVolumesAsync() throws Exception {
|
||||
LOG.info("Executing {}", testName.getMethodName());
|
||||
|
||||
final List<FsVolumeSpi> volumes = makeVolumes(
|
||||
NUM_VOLUMES, expectedVolumeHealth);
|
||||
final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
|
||||
final DatasetVolumeChecker checker =
|
||||
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
|
||||
checker.setDelegateChecker(new DummyChecker());
|
||||
final AtomicLong numCallbackInvocations = new AtomicLong(0);
|
||||
|
||||
boolean result = checker.checkAllVolumesAsync(
|
||||
dataset, new DatasetVolumeChecker.Callback() {
|
||||
@Override
|
||||
public void call(
|
||||
Set<FsVolumeSpi> healthyVolumes,
|
||||
Set<FsVolumeSpi> failedVolumes) {
|
||||
LOG.info("Got back {} failed volumes", failedVolumes.size());
|
||||
if (expectedVolumeHealth == null ||
|
||||
expectedVolumeHealth == FAILED) {
|
||||
assertThat(healthyVolumes.size(), is(0));
|
||||
assertThat(failedVolumes.size(), is(NUM_VOLUMES));
|
||||
} else {
|
||||
assertThat(healthyVolumes.size(), is(NUM_VOLUMES));
|
||||
assertThat(failedVolumes.size(), is(0));
|
||||
}
|
||||
numCallbackInvocations.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
// The callback should be invoked exactly once.
|
||||
if (result) {
|
||||
assertThat(numCallbackInvocations.get(), is(1L));
|
||||
}
|
||||
|
||||
// Ensure each volume's check() method was called exactly once.
|
||||
for (FsVolumeSpi volume : volumes) {
|
||||
verify(volume, times(1)).check(anyObject());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A checker to wraps the result of {@link FsVolumeSpi#check} in
|
||||
* an ImmediateFuture.
|
||||
|
|
Loading…
Reference in New Issue