From 6f35700f0427348a60a65abcea20816d13cb3ecf Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 29 Nov 2016 20:31:02 -0800 Subject: [PATCH] HDFS-11149. Support for parallel checking of FsVolumes. Change-Id: I6e7ada81562f380a260f8f561d0fa6a4cb102560 --- .../checker/DatasetVolumeChecker.java | 442 ++++++++++++++++++ .../datanode/fsdataset/FsDatasetSpi.java | 7 + .../datanode/fsdataset/FsVolumeSpi.java | 12 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 21 +- .../src/main/resources/hdfs-default.xml | 10 +- .../server/datanode/SimulatedFSDataset.java | 7 + .../server/datanode/TestDirectoryScanner.java | 7 + .../checker/TestDatasetVolumeChecker.java | 259 ++++++++++ .../TestDatasetVolumeCheckerFailures.java | 190 ++++++++ .../extdataset/ExternalVolumeImpl.java | 7 + 10 files changed, 951 insertions(+), 11 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java new file mode 100644 index 00000000000..e49ec7d6f08 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.nio.channels.ClosedChannelException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY; + +/** + * A class that encapsulates running disk checks against each volume of an + * {@link FsDatasetSpi} and allows retrieving a list of failed volumes. + * + * This splits out behavior that was originally implemented across + * DataNode, FsDatasetImpl and FsVolumeList. + */ +public class DatasetVolumeChecker { + + public static final Logger LOG = + LoggerFactory.getLogger(DatasetVolumeChecker.class); + + private AsyncChecker delegateChecker; + + private final AtomicLong numVolumeChecks = new AtomicLong(0); + private final AtomicLong numSyncDatasetChecks = new AtomicLong(0); + private final AtomicLong numAsyncDatasetChecks = new AtomicLong(0); + private final AtomicLong numSkippedChecks = new AtomicLong(0); + + /** + * Max allowed time for a disk check in milliseconds. If the check + * doesn't complete within this time we declare the disk as dead. + */ + private final long maxAllowedTimeForCheckMs; + + /** + * Maximum number of volume failures that can be tolerated without + * declaring a fatal error. + */ + private final int maxVolumeFailuresTolerated; + + /** + * Minimum time between two successive disk checks of a volume. + */ + private final long minDiskCheckGapMs; + + /** + * Timestamp of the last check of all volumes. + */ + private long lastAllVolumesCheck; + + private final Timer timer; + + private static final VolumeCheckContext IGNORED_CONTEXT = + new VolumeCheckContext(); + + /** + * @param conf Configuration object. + * @param timer {@link Timer} object used for throttling checks. + */ + public DatasetVolumeChecker(Configuration conf, Timer timer) + throws DiskErrorException { + maxAllowedTimeForCheckMs = conf.getTimeDuration( + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + if (maxAllowedTimeForCheckMs <= 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - " + + maxAllowedTimeForCheckMs + " (should be > 0)"); + } + + this.timer = timer; + + maxVolumeFailuresTolerated = conf.getInt( + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); + + minDiskCheckGapMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT, + TimeUnit.MILLISECONDS); + + if (minDiskCheckGapMs < 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - " + + minDiskCheckGapMs + " (should be >= 0)"); + } + + lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs; + + if (maxVolumeFailuresTolerated < 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - " + + maxVolumeFailuresTolerated + " (should be non-negative)"); + } + + delegateChecker = new ThrottledAsyncChecker<>( + timer, minDiskCheckGapMs, Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("DataNode DiskChecker thread %d") + .setDaemon(true) + .build())); + } + + /** + * Run checks against all volumes of a dataset. + * + * This check may be performed at service startup and subsequently at + * regular intervals to detect and handle failed volumes. + * + * @param dataset - FsDatasetSpi to be checked. + * @return set of failed volumes. + */ + public Set checkAllVolumes( + final FsDatasetSpi dataset) + throws InterruptedException { + + if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { + numSkippedChecks.incrementAndGet(); + return Collections.emptySet(); + } + + lastAllVolumesCheck = timer.monotonicNow(); + final Set healthyVolumes = new HashSet<>(); + final Set failedVolumes = new HashSet<>(); + final Set allVolumes = new HashSet<>(); + + final FsDatasetSpi.FsVolumeReferences references = + dataset.getFsVolumeReferences(); + final CountDownLatch resultsLatch = new CountDownLatch(references.size()); + + for (int i = 0; i < references.size(); ++i) { + final FsVolumeReference reference = references.getReference(i); + allVolumes.add(reference.getVolume()); + ListenableFuture future = + delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); + LOG.info("Scheduled health check for volume {}", reference.getVolume()); + Futures.addCallback(future, new ResultHandler( + reference, healthyVolumes, failedVolumes, resultsLatch, null)); + } + + // Wait until our timeout elapses, after which we give up on + // the remaining volumes. + if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { + LOG.warn("checkAllVolumes timed out after {} ms" + + maxAllowedTimeForCheckMs); + } + + numSyncDatasetChecks.incrementAndGet(); + synchronized (this) { + // All volumes that have not been detected as healthy should be + // considered failed. This is a superset of 'failedVolumes'. + // + // Make a copy under the mutex as Sets.difference() returns a view + // of a potentially changing set. + return new HashSet<>(Sets.difference(allVolumes, healthyVolumes)); + } + } + + /** + * Start checks against all volumes of a dataset, invoking the + * given callback when the operation has completed. The function + * does not wait for the checks to complete. + * + * If a volume cannot be referenced then it is already closed and + * cannot be checked. No error is propagated to the callback for that + * volume. + * + * @param dataset - FsDatasetSpi to be checked. + * @param callback - Callback to be invoked when the checks are complete. + * @return true if the check was scheduled and the callback will be invoked. + * false if the check was not scheduled and the callback will not be + * invoked. + */ + public boolean checkAllVolumesAsync( + final FsDatasetSpi dataset, + Callback callback) { + + if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { + numSkippedChecks.incrementAndGet(); + return false; + } + + lastAllVolumesCheck = timer.monotonicNow(); + final Set healthyVolumes = new HashSet<>(); + final Set failedVolumes = new HashSet<>(); + final FsDatasetSpi.FsVolumeReferences references = + dataset.getFsVolumeReferences(); + final CountDownLatch latch = new CountDownLatch(references.size()); + + LOG.info("Checking {} volumes", references.size()); + for (int i = 0; i < references.size(); ++i) { + final FsVolumeReference reference = references.getReference(i); + // The context parameter is currently ignored. + ListenableFuture future = + delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); + Futures.addCallback(future, new ResultHandler( + reference, healthyVolumes, failedVolumes, latch, callback)); + } + numAsyncDatasetChecks.incrementAndGet(); + return true; + } + + /** + * A callback interface that is supplied the result of running an + * async disk check on multiple volumes. + */ + public interface Callback { + /** + * @param healthyVolumes set of volumes that passed disk checks. + * @param failedVolumes set of volumes that failed disk checks. + */ + void call(Set healthyVolumes, + Set failedVolumes); + } + + /** + * Check a single volume, returning a {@link ListenableFuture} + * that can be used to retrieve the final result. + * + * If the volume cannot be referenced then it is already closed and + * cannot be checked. No error is propagated to the callback. + * + * @param volume the volume that is to be checked. + * @param callback callback to be invoked when the volume check completes. + */ + public void checkVolume( + final FsVolumeSpi volume, + Callback callback) { + FsVolumeReference volumeReference; + try { + volumeReference = volume.obtainReference(); + } catch (ClosedChannelException e) { + // The volume has already been closed. + callback.call(new HashSet(), new HashSet()); + return; + } + ListenableFuture future = + delegateChecker.schedule(volume, IGNORED_CONTEXT); + numVolumeChecks.incrementAndGet(); + Futures.addCallback(future, new ResultHandler( + volumeReference, new HashSet(), + new HashSet(), new CountDownLatch(1), callback)); + } + + /** + * A callback to process the results of checking a volume. + */ + private class ResultHandler + implements FutureCallback { + private final FsVolumeReference reference; + private final Set failedVolumes; + private final Set healthyVolumes; + private final CountDownLatch latch; + private final AtomicLong numVolumes; + + @Nullable + private final Callback callback; + + ResultHandler(FsVolumeReference reference, + Set healthyVolumes, + Set failedVolumes, + CountDownLatch latch, + @Nullable Callback callback) { + Preconditions.checkState(reference != null); + this.reference = reference; + this.healthyVolumes = healthyVolumes; + this.failedVolumes = failedVolumes; + this.latch = latch; + this.callback = callback; + numVolumes = new AtomicLong(latch.getCount()); + } + + @Override + public void onSuccess(@Nonnull VolumeCheckResult result) { + switch(result) { + case HEALTHY: + case DEGRADED: + LOG.debug("Volume {} is {}.", reference.getVolume(), result); + markHealthy(); + break; + case FAILED: + LOG.warn("Volume {} detected as being unhealthy", + reference.getVolume()); + markFailed(); + break; + default: + LOG.error("Unexpected health check result {} for volume {}", + result, reference.getVolume()); + markHealthy(); + break; + } + cleanup(); + } + + @Override + public void onFailure(@Nonnull Throwable t) { + Throwable exception = (t instanceof ExecutionException) ? + t.getCause() : t; + LOG.warn("Exception running disk checks against volume " + + reference.getVolume(), exception); + markFailed(); + cleanup(); + } + + private void markHealthy() { + synchronized (DatasetVolumeChecker.this) { + healthyVolumes.add(reference.getVolume()); + } + } + + private void markFailed() { + synchronized (DatasetVolumeChecker.this) { + failedVolumes.add(reference.getVolume()); + } + } + + private void cleanup() { + IOUtils.cleanup(null, reference); + invokeCallback(); + } + + private void invokeCallback() { + try { + latch.countDown(); + + if (numVolumes.decrementAndGet() == 0 && + callback != null) { + callback.call(healthyVolumes, failedVolumes); + } + } catch(Exception e) { + // Propagating this exception is unlikely to be helpful. + LOG.warn("Unexpected exception", e); + } + } + } + + /** + * Shutdown the checker and its associated ExecutorService. + * + * See {@link ExecutorService#awaitTermination} for the interpretation + * of the parameters. + */ + public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) { + try { + delegateChecker.shutdownAndWait(gracePeriod, timeUnit); + } catch (InterruptedException e) { + LOG.warn("DatasetVolumeChecker interrupted during shutdown."); + Thread.currentThread().interrupt(); + } + } + + /** + * This method is for testing only. + * + * @param testDelegate + */ + @VisibleForTesting + void setDelegateChecker( + AsyncChecker testDelegate) { + delegateChecker = testDelegate; + } + + /** + * Return the number of {@link #checkVolume} invocations. + */ + public long getNumVolumeChecks() { + return numVolumeChecks.get(); + } + + /** + * Return the number of {@link #checkAllVolumes} invocations. + */ + public long getNumSyncDatasetChecks() { + return numSyncDatasetChecks.get(); + } + + /** + * Return the number of {@link #checkAllVolumesAsync} invocations. + */ + public long getNumAsyncDatasetChecks() { + return numAsyncDatasetChecks.get(); + } + + /** + * Return the number of checks skipped because the minimum gap since the + * last check had not elapsed. + */ + public long getNumSkippedChecks() { + return numSkippedChecks.get(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index f7c3711357b..52201bee2ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -160,6 +160,13 @@ public interface FsDatasetSpi extends FSDatasetMBean { return references.get(index).getVolume(); } + /** + * Get the reference for a given index. + */ + public FsVolumeReference getReference(int index) { + return references.get(index); + } + @Override public void close() throws IOException { IOException ioe = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index d74fe09ba2f..13c7e46d8b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -24,11 +24,15 @@ import java.nio.channels.ClosedChannelException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; /** * This is an interface for the underlying volume. */ -public interface FsVolumeSpi { +public interface FsVolumeSpi + extends Checkable { + /** * Obtain a reference object that had increased 1 reference count of the * volume. @@ -197,4 +201,10 @@ public interface FsVolumeSpi { */ byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile) throws IOException; + + /** + * Context for the {@link #check} call. + */ + class VolumeCheckContext { + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index ee60d7ee831..6d96c9b6907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -46,21 +46,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.CloseableReferenceCount; -import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -859,7 +860,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } @Override - public FsDatasetSpi getDataset() { + public FsDatasetSpi getDataset() { return dataset; } @@ -902,6 +903,16 @@ public class FsVolumeImpl implements FsVolumeSpi { s.checkDirs(); } } + + @Override + public VolumeCheckResult check(VolumeCheckContext ignored) + throws DiskErrorException { + // TODO:FEDERATION valid synchronization + for(BlockPoolSlice s : bpSlices.values()) { + s.checkDirs(); + } + return VolumeCheckResult.HEALTHY; + } void getVolumeMap(ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 30f9e2af58d..baa1f2d1366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4186,11 +4186,11 @@ dfs.datanode.disk.check.timeout 10m - Maximum allowed time for a disk check to complete. If the check does - not complete within this time interval then the disk is declared as - failed. This setting supports multiple time unit suffixes as described - in dfs.heartbeat.interval. If no suffix is specified then milliseconds - is assumed. + Maximum allowed time for a disk check to complete during DataNode + startup. If the check does not complete within this time interval + then the disk is declared as failed. This setting supports + multiple time unit suffixes as described in dfs.heartbeat.interval. + If no suffix is specified then milliseconds is assumed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 5a54a754409..28192df1cec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -38,6 +38,7 @@ import javax.management.StandardMBean; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -529,6 +530,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { File blockFile, File metaFile) throws IOException { return null; } + + @Override + public VolumeCheckResult check(VolumeCheckContext context) + throws Exception { + return VolumeCheckResult.HEALTHY; + } } private final Map> blockMap diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 35bd7e8bd37..a2b639842dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -884,6 +885,12 @@ public class TestDirectoryScanner { File blockFile, File metaFile) throws IOException { return null; } + + @Override + public VolumeCheckResult check(VolumeCheckContext context) + throws Exception { + return VolumeCheckResult.HEALTHY; + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java new file mode 100644 index 00000000000..a9041abd750 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Tests for {@link DatasetVolumeChecker} when the {@link FsVolumeSpi#check} + * method returns different values of {@link VolumeCheckResult}. + */ +@RunWith(Parameterized.class) +public class TestDatasetVolumeChecker { + public static final Logger LOG = + LoggerFactory.getLogger(TestDatasetVolumeChecker.class); + + @Rule + public TestName testName = new TestName(); + + /** + * Run each test case for each possible value of {@link VolumeCheckResult}. + * Including "null" for 'throw exception'. + * @return + */ + @Parameters(name="{0}") + public static Collection data() { + List values = new ArrayList<>(); + for (VolumeCheckResult result : VolumeCheckResult.values()) { + values.add(new Object[] {result}); + } + values.add(new Object[] {null}); + return values; + } + + /** + * When null, the check call should throw an exception. + */ + private final VolumeCheckResult expectedVolumeHealth; + private static final int NUM_VOLUMES = 2; + + + public TestDatasetVolumeChecker(VolumeCheckResult expectedVolumeHealth) { + this.expectedVolumeHealth = expectedVolumeHealth; + } + + /** + * Test {@link DatasetVolumeChecker#checkVolume} propagates the + * check to the delegate checker. + * + * @throws Exception + */ + @Test(timeout = 10000) + public void testCheckOneVolume() throws Exception { + LOG.info("Executing {}", testName.getMethodName()); + final FsVolumeSpi volume = makeVolumes(1, expectedVolumeHealth).get(0); + final DatasetVolumeChecker checker = + new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + checker.setDelegateChecker(new DummyChecker()); + final AtomicLong numCallbackInvocations = new AtomicLong(0); + + /** + * Request a check and ensure it triggered {@link FsVolumeSpi#check}. + */ + checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { + @Override + public void call(Set healthyVolumes, + Set failedVolumes) { + numCallbackInvocations.incrementAndGet(); + if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { + assertThat(healthyVolumes.size(), is(1)); + assertThat(failedVolumes.size(), is(0)); + } else { + assertThat(healthyVolumes.size(), is(0)); + assertThat(failedVolumes.size(), is(1)); + } + } + }); + + // Ensure that the check was invoked at least once. + verify(volume, times(1)).check(any(VolumeCheckContext.class)); + assertThat(numCallbackInvocations.get(), is(1L)); + } + + /** + * Test {@link DatasetVolumeChecker#checkAllVolumes} propagates + * checks for all volumes to the delegate checker. + * + * @throws Exception + */ + @Test(timeout = 10000) + public void testCheckAllVolumes() throws Exception { + LOG.info("Executing {}", testName.getMethodName()); + + final List volumes = makeVolumes( + NUM_VOLUMES, expectedVolumeHealth); + final FsDatasetSpi dataset = makeDataset(volumes); + final DatasetVolumeChecker checker = + new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + checker.setDelegateChecker(new DummyChecker()); + + Set failedVolumes = checker.checkAllVolumes(dataset); + LOG.info("Got back {} failed volumes", failedVolumes.size()); + + if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) { + assertThat(failedVolumes.size(), is(NUM_VOLUMES)); + } else { + assertTrue(failedVolumes.isEmpty()); + } + + // Ensure each volume's check() method was called exactly once. + for (FsVolumeSpi volume : volumes) { + verify(volume, times(1)).check(any(VolumeCheckContext.class)); + } + } + + /** + * Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}. + * + * @throws Exception + */ + @Test(timeout=10000) + public void testCheckAllVolumesAsync() throws Exception { + LOG.info("Executing {}", testName.getMethodName()); + + final List volumes = makeVolumes( + NUM_VOLUMES, expectedVolumeHealth); + final FsDatasetSpi dataset = makeDataset(volumes); + final DatasetVolumeChecker checker = + new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + checker.setDelegateChecker(new DummyChecker()); + final AtomicLong numCallbackInvocations = new AtomicLong(0); + + checker.checkAllVolumesAsync( + dataset, new DatasetVolumeChecker.Callback() { + @Override + public void call( + Set healthyVolumes, + Set failedVolumes) { + LOG.info("Got back {} failed volumes", failedVolumes.size()); + if (expectedVolumeHealth == null || + expectedVolumeHealth == FAILED) { + assertThat(healthyVolumes.size(), is(0)); + assertThat(failedVolumes.size(), is(NUM_VOLUMES)); + } else { + assertThat(healthyVolumes.size(), is(NUM_VOLUMES)); + assertThat(failedVolumes.size(), is(0)); + } + numCallbackInvocations.incrementAndGet(); + } + }); + + // The callback should be invoked exactly once. + assertThat(numCallbackInvocations.get(), is(1L)); + + // Ensure each volume's check() method was called exactly once. + for (FsVolumeSpi volume : volumes) { + verify(volume, times(1)).check(any(VolumeCheckContext.class)); + } + } + + /** + * A checker to wraps the result of {@link FsVolumeSpi#check} in + * an ImmediateFuture. + */ + static class DummyChecker + implements AsyncChecker { + @Override + public ListenableFuture schedule( + Checkable target, + VolumeCheckContext context) { + try { + return Futures.immediateFuture(target.check(context)); + } catch (Exception e) { + LOG.info("check routine threw exception " + e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException { + // Nothing to cancel. + } + } + + /** + * Create a dataset with the given volumes. + */ + static FsDatasetSpi makeDataset(List volumes) + throws Exception { + // Create dataset and init volume health. + final FsDatasetSpi dataset = mock(FsDatasetSpi.class); + final FsDatasetSpi.FsVolumeReferences references = new + FsDatasetSpi.FsVolumeReferences(volumes); + when(dataset.getFsVolumeReferences()).thenReturn(references); + return dataset; + } + + private static List makeVolumes( + int numVolumes, VolumeCheckResult health) throws Exception { + final List volumes = new ArrayList<>(numVolumes); + for (int i = 0; i < numVolumes; ++i) { + final FsVolumeSpi volume = mock(FsVolumeSpi.class); + final FsVolumeReference reference = mock(FsVolumeReference.class); + + when(reference.getVolume()).thenReturn(volume); + when(volume.obtainReference()).thenReturn(reference); + + if (health != null) { + when(volume.check(any(VolumeCheckContext.class))).thenReturn(health); + } else { + final DiskErrorException de = new DiskErrorException("Fake Exception"); + when(volume.check(any(VolumeCheckContext.class))).thenThrow(de); + } + volumes.add(volume); + } + return volumes; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java new file mode 100644 index 00000000000..56446de8620 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.TimeUnit; +import java.util.*; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; + + +/** + * Test a few more conditions not covered by TestDatasetVolumeChecker. + */ +public class TestDatasetVolumeCheckerFailures { + public static final Logger LOG =LoggerFactory.getLogger( + TestDatasetVolumeCheckerFailures.class); + + /** + * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}. + * @throws Exception + */ + @Test(timeout=60000) + public void testTimeout() throws Exception { + // Add a volume whose check routine hangs forever. + final List volumes = + Collections.singletonList(makeHungVolume()); + + final FsDatasetSpi dataset = + TestDatasetVolumeChecker.makeDataset(volumes); + + // Create a disk checker with a very low timeout. + final HdfsConfiguration conf = new HdfsConfiguration(); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + 1, TimeUnit.SECONDS); + final DatasetVolumeChecker checker = + new DatasetVolumeChecker(conf, new FakeTimer()); + + // Ensure that the hung volume is detected as failed. + Set failedVolumes = checker.checkAllVolumes(dataset); + assertThat(failedVolumes.size(), is(1)); + } + + /** + * Test checking a closed volume i.e. one which cannot be referenced. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testCheckingClosedVolume() throws Exception { + // Add a volume that cannot be referenced. + final List volumes = + Collections.singletonList(makeClosedVolume()); + + final FsDatasetSpi dataset = + TestDatasetVolumeChecker.makeDataset(volumes); + + DatasetVolumeChecker checker = + new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + Set failedVolumes = checker.checkAllVolumes(dataset); + assertThat(failedVolumes.size(), is(0)); + + // The closed volume should not have been checked as it cannot + // be referenced. + verify(volumes.get(0), times(0)).check(any(VolumeCheckContext.class)); + } + + @Test(timeout=60000) + public void testMinGapIsEnforcedForSyncChecks() throws Exception { + final FsDatasetSpi dataset = + TestDatasetVolumeChecker.makeDataset(new ArrayList()); + final FakeTimer timer = new FakeTimer(); + final Configuration conf = new HdfsConfiguration(); + final long minGapMs = 100; + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + minGapMs, TimeUnit.MILLISECONDS); + final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); + + checker.checkAllVolumes(dataset); + assertThat(checker.getNumSyncDatasetChecks(), is(1L)); + + // Re-check without advancing the timer. Ensure the check is skipped. + checker.checkAllVolumes(dataset); + assertThat(checker.getNumSyncDatasetChecks(), is(1L)); + assertThat(checker.getNumSkippedChecks(), is(1L)); + + // Re-check after advancing the timer. Ensure the check is performed. + timer.advance(minGapMs); + checker.checkAllVolumes(dataset); + assertThat(checker.getNumSyncDatasetChecks(), is(2L)); + assertThat(checker.getNumSkippedChecks(), is(1L)); + } + + @Test(timeout=60000) + public void testMinGapIsEnforcedForASyncChecks() throws Exception { + final FsDatasetSpi dataset = + TestDatasetVolumeChecker.makeDataset(new ArrayList()); + final FakeTimer timer = new FakeTimer(); + final Configuration conf = new HdfsConfiguration(); + final long minGapMs = 100; + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + minGapMs, TimeUnit.MILLISECONDS); + final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); + + checker.checkAllVolumesAsync(dataset, null); + assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); + + // Re-check without advancing the timer. Ensure the check is skipped. + checker.checkAllVolumesAsync(dataset, null); + assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); + assertThat(checker.getNumSkippedChecks(), is(1L)); + + // Re-check after advancing the timer. Ensure the check is performed. + timer.advance(minGapMs); + checker.checkAllVolumesAsync(dataset, null); + assertThat(checker.getNumAsyncDatasetChecks(), is(2L)); + assertThat(checker.getNumSkippedChecks(), is(1L)); + } + + /** + * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine + * hangs forever. + * + * @return volume + * @throws Exception + */ + private static FsVolumeSpi makeHungVolume() throws Exception { + final FsVolumeSpi volume = mock(FsVolumeSpi.class); + final FsVolumeReference reference = mock(FsVolumeReference.class); + + when(reference.getVolume()).thenReturn(volume); + when(volume.obtainReference()).thenReturn(reference); + when(volume.check(any(VolumeCheckContext.class))).thenAnswer( + new Answer() { + @Override + public VolumeCheckResult answer(InvocationOnMock invocation) + throws Throwable { + Thread.sleep(Long.MAX_VALUE); // Sleep forever. + return VolumeCheckResult.HEALTHY; // unreachable. + } + }); + return volume; + } + + /** + * Create a mock FsVolumeSpi which is closed and hence cannot + * be referenced. + * + * @return volume + * @throws Exception + */ + private static FsVolumeSpi makeClosedVolume() throws Exception { + final FsVolumeSpi volume = mock(FsVolumeSpi.class); + + when(volume.obtainReference()).thenThrow(new ClosedChannelException()); + return volume; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 071b1a61042..5c62b5b6702 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -107,4 +108,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi { public FsDatasetSpi getDataset() { return null; } + + @Override + public VolumeCheckResult check(VolumeCheckContext context) + throws Exception { + return VolumeCheckResult.HEALTHY; + } }