diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e893c5e70e7..28d627a3901 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2047,25 +2047,6 @@ public class DataNode extends ReconfigurableBase tracer.close(); } - /** - * Check if there is a disk failure asynchronously - * and if so, handle the error. - */ - @VisibleForTesting - public void checkDiskErrorAsync() { - volumeChecker.checkAllVolumesAsync( - data, (healthyVolumes, failedVolumes) -> { - if (failedVolumes.size() > 0) { - LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}", - failedVolumes.size(), failedVolumes); - } else { - LOG.debug("checkDiskErrorAsync: no volume failures detected"); - } - lastDiskErrorCheck = Time.monotonicNow(); - handleVolumeFailures(failedVolumes); - }); - } - /** * Check if there is a disk failure asynchronously * and if so, handle the error. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java index cab612281bd..9ad47f012d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -232,68 +232,6 @@ public class DatasetVolumeChecker { } } - /** - * Start checks against all volumes of a dataset, invoking the - * given callback when the operation has completed. The function - * does not wait for the checks to complete. - * - * If a volume cannot be referenced then it is already closed and - * cannot be checked. No error is propagated to the callback for that - * volume. - * - * @param dataset - FsDatasetSpi to be checked. - * @param callback - Callback to be invoked when the checks are complete. - * @return true if the check was scheduled and the callback will be invoked. - * false if the check was not scheduled and the callback will not be - * invoked. - */ - public boolean checkAllVolumesAsync( - final FsDatasetSpi dataset, - Callback callback) { - final long gap = timer.monotonicNow() - lastAllVolumesCheck; - if (gap < minDiskCheckGapMs) { - numSkippedChecks.incrementAndGet(); - LOG.trace( - "Skipped checking all volumes, time since last check {} is less " + - "than the minimum gap between checks ({} ms).", - gap, minDiskCheckGapMs); - return false; - } - - final FsDatasetSpi.FsVolumeReferences references = - dataset.getFsVolumeReferences(); - - if (references.size() == 0) { - LOG.warn("checkAllVolumesAsync - no volumes can be referenced"); - return false; - } - - lastAllVolumesCheck = timer.monotonicNow(); - final Set healthyVolumes = new HashSet<>(); - final Set failedVolumes = new HashSet<>(); - final AtomicLong numVolumes = new AtomicLong(references.size()); - boolean added = false; - - LOG.info("Checking {} volumes", references.size()); - for (int i = 0; i < references.size(); ++i) { - final FsVolumeReference reference = references.getReference(i); - // The context parameter is currently ignored. - Optional> olf = - delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); - if (olf.isPresent()) { - added = true; - Futures.addCallback(olf.get(), - new ResultHandler(reference, healthyVolumes, failedVolumes, - numVolumes, callback)); - } else { - IOUtils.cleanup(null, reference); - numVolumes.decrementAndGet(); - } - } - numAsyncDatasetChecks.incrementAndGet(); - return added; - } - /** * A callback interface that is supplied the result of running an * async disk check on multiple volumes. @@ -488,13 +426,6 @@ public class DatasetVolumeChecker { return numSyncDatasetChecks.get(); } - /** - * Return the number of {@link #checkAllVolumesAsync} invocations. - */ - public long getNumAsyncDatasetChecks() { - return numAsyncDatasetChecks.get(); - } - /** * Return the number of checks skipped because the minimum gap since the * last check had not elapsed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 5a1ad87ed2a..cf5b724ac71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.mockito.Mockito; @@ -242,4 +243,18 @@ public class DataNodeTestUtils { LOG.warn("Could not reconfigure DataNode.", e); } } + + /** Get the FsVolume on the given basePath. */ + public static FsVolumeImpl getVolume(DataNode dn, File basePath) throws + IOException { + try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset() + .getFsVolumeReferences()) { + for (FsVolumeSpi vol : volumes) { + if (vol.getBaseURI().equals(basePath.toURI())) { + return (FsVolumeImpl) vol; + } + } + } + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 0401a8120f2..80ca0ff362e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -899,20 +899,6 @@ public class TestDataNodeHotSwapVolumes { is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); } - /** Get the FsVolume on the given basePath */ - private FsVolumeImpl getVolume(DataNode dn, File basePath) - throws IOException { - try (FsDatasetSpi.FsVolumeReferences volumes = - dn.getFSDataset().getFsVolumeReferences()) { - for (FsVolumeSpi vol : volumes) { - if (vol.getBaseURI().equals(basePath.toURI())) { - return (FsVolumeImpl) vol; - } - } - } - return null; - } - /** * Verify that {@link DataNode#checkDiskError()} removes all metadata in * DataNode upon a volume failure. Thus we can run reconfig on the same @@ -933,7 +919,7 @@ public class TestDataNodeHotSwapVolumes { final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY); File dirToFail = new File(cluster.getDataDirectory(), "data1"); - FsVolumeImpl failedVolume = getVolume(dn, dirToFail); + FsVolumeImpl failedVolume = DataNodeTestUtils.getVolume(dn, dirToFail); assertTrue("No FsVolume was found for " + dirToFail, failedVolume != null); long used = failedVolume.getDfsUsed(); @@ -957,7 +943,7 @@ public class TestDataNodeHotSwapVolumes { is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); createFile(new Path("/test2"), 32, (short)2); - FsVolumeImpl restoredVolume = getVolume(dn, dirToFail); + FsVolumeImpl restoredVolume = DataNodeTestUtils.getVolume(dn, dirToFail); assertTrue(restoredVolume != null); assertTrue(restoredVolume != failedVolume); // More data has been written to this volume. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 06e287144f8..970b83bdd9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -237,7 +237,7 @@ public class TestDataNodeVolumeFailure { File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); DataNodeTestUtils.injectDataDirFailure(dn0Vol1); DataNode dn0 = cluster.getDataNodes().get(0); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); // Verify dn0Vol1 has been completely removed from DN0. // 1. dn0Vol1 is removed from DataStorage. @@ -284,10 +284,10 @@ public class TestDataNodeVolumeFailure { assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath())); } - private static void checkDiskErrorSync(DataNode dn) + private static void checkDiskErrorSync(DataNode dn, FsVolumeSpi volume) throws InterruptedException { final long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); - dn.checkDiskErrorAsync(); + dn.checkDiskErrorAsync(volume); // Wait 10 seconds for checkDiskError thread to finish and discover volume // failures. int count = 100; @@ -311,7 +311,8 @@ public class TestDataNodeVolumeFailure { final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2); DataNode dn0 = cluster.getDataNodes().get(0); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); // DN0 should stop after the number of failure disks exceed tolerated // value (1). @@ -332,7 +333,7 @@ public class TestDataNodeVolumeFailure { // Fail dn0Vol1 first. DataNodeTestUtils.injectDataDirFailure(dn0Vol1); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); // Hot swap out the failure volume. String dataDirs = dn0Vol2.getPath(); @@ -351,7 +352,7 @@ public class TestDataNodeVolumeFailure { // Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient // resources, thus it should keep running. DataNodeTestUtils.injectDataDirFailure(dn0Vol2); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); assertTrue(dn0.shouldRun()); } @@ -378,12 +379,12 @@ public class TestDataNodeVolumeFailure { // Fail dn0Vol1 first and hot swap it. DataNodeTestUtils.injectDataDirFailure(dn0Vol1); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); assertTrue(dn0.shouldRun()); // Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure. DataNodeTestUtils.injectDataDirFailure(dn0Vol2); - checkDiskErrorSync(dn0); + checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); assertFalse(dn0.shouldRun()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java index f5bb8078b40..b37cc75e6e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java @@ -161,53 +161,6 @@ public class TestDatasetVolumeChecker { } } - /** - * Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}. - * - * @throws Exception - */ - @Test(timeout=10000) - public void testCheckAllVolumesAsync() throws Exception { - LOG.info("Executing {}", testName.getMethodName()); - - final List volumes = makeVolumes( - NUM_VOLUMES, expectedVolumeHealth); - final FsDatasetSpi dataset = makeDataset(volumes); - final DatasetVolumeChecker checker = - new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); - checker.setDelegateChecker(new DummyChecker()); - final AtomicLong numCallbackInvocations = new AtomicLong(0); - - boolean result = checker.checkAllVolumesAsync( - dataset, new DatasetVolumeChecker.Callback() { - @Override - public void call( - Set healthyVolumes, - Set failedVolumes) { - LOG.info("Got back {} failed volumes", failedVolumes.size()); - if (expectedVolumeHealth == null || - expectedVolumeHealth == FAILED) { - assertThat(healthyVolumes.size(), is(0)); - assertThat(failedVolumes.size(), is(NUM_VOLUMES)); - } else { - assertThat(healthyVolumes.size(), is(NUM_VOLUMES)); - assertThat(failedVolumes.size(), is(0)); - } - numCallbackInvocations.incrementAndGet(); - } - }); - - // The callback should be invoked exactly once. - if (result) { - assertThat(numCallbackInvocations.get(), is(1L)); - } - - // Ensure each volume's check() method was called exactly once. - for (FsVolumeSpi volume : volumes) { - verify(volume, times(1)).check(anyObject()); - } - } - /** * A checker to wraps the result of {@link FsVolumeSpi#check} in * an ImmediateFuture.