HDFS-11149. Support for parallel checking of FsVolumes.

Change-Id: I6e7ada81562f380a260f8f561d0fa6a4cb102560
This commit is contained in:
Arpit Agarwal 2016-11-29 20:31:02 -08:00
parent fd3b1ca26c
commit 6f35700f04
10 changed files with 951 additions and 11 deletions

View File

@ -0,0 +1,442 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
/**
* A class that encapsulates running disk checks against each volume of an
* {@link FsDatasetSpi} and allows retrieving a list of failed volumes.
*
* This splits out behavior that was originally implemented across
* DataNode, FsDatasetImpl and FsVolumeList.
*/
public class DatasetVolumeChecker {
public static final Logger LOG =
LoggerFactory.getLogger(DatasetVolumeChecker.class);
private AsyncChecker<VolumeCheckContext, VolumeCheckResult> delegateChecker;
private final AtomicLong numVolumeChecks = new AtomicLong(0);
private final AtomicLong numSyncDatasetChecks = new AtomicLong(0);
private final AtomicLong numAsyncDatasetChecks = new AtomicLong(0);
private final AtomicLong numSkippedChecks = new AtomicLong(0);
/**
* Max allowed time for a disk check in milliseconds. If the check
* doesn't complete within this time we declare the disk as dead.
*/
private final long maxAllowedTimeForCheckMs;
/**
* Maximum number of volume failures that can be tolerated without
* declaring a fatal error.
*/
private final int maxVolumeFailuresTolerated;
/**
* Minimum time between two successive disk checks of a volume.
*/
private final long minDiskCheckGapMs;
/**
* Timestamp of the last check of all volumes.
*/
private long lastAllVolumesCheck;
private final Timer timer;
private static final VolumeCheckContext IGNORED_CONTEXT =
new VolumeCheckContext();
/**
* @param conf Configuration object.
* @param timer {@link Timer} object used for throttling checks.
*/
public DatasetVolumeChecker(Configuration conf, Timer timer)
throws DiskErrorException {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (maxAllowedTimeForCheckMs <= 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ maxAllowedTimeForCheckMs + " (should be > 0)");
}
this.timer = timer;
maxVolumeFailuresTolerated = conf.getInt(
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
minDiskCheckGapMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS);
if (minDiskCheckGapMs < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+ minDiskCheckGapMs + " (should be >= 0)");
}
lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
if (maxVolumeFailuresTolerated < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + " (should be non-negative)");
}
delegateChecker = new ThrottledAsyncChecker<>(
timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("DataNode DiskChecker thread %d")
.setDaemon(true)
.build()));
}
/**
* Run checks against all volumes of a dataset.
*
* This check may be performed at service startup and subsequently at
* regular intervals to detect and handle failed volumes.
*
* @param dataset - FsDatasetSpi to be checked.
* @return set of failed volumes.
*/
public Set<FsVolumeSpi> checkAllVolumes(
final FsDatasetSpi<? extends FsVolumeSpi> dataset)
throws InterruptedException {
if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
return Collections.emptySet();
}
lastAllVolumesCheck = timer.monotonicNow();
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final Set<FsVolumeSpi> allVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
final CountDownLatch resultsLatch = new CountDownLatch(references.size());
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
allVolumes.add(reference.getVolume());
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}", reference.getVolume());
Futures.addCallback(future, new ResultHandler(
reference, healthyVolumes, failedVolumes, resultsLatch, null));
}
// Wait until our timeout elapses, after which we give up on
// the remaining volumes.
if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms" +
maxAllowedTimeForCheckMs);
}
numSyncDatasetChecks.incrementAndGet();
synchronized (this) {
// All volumes that have not been detected as healthy should be
// considered failed. This is a superset of 'failedVolumes'.
//
// Make a copy under the mutex as Sets.difference() returns a view
// of a potentially changing set.
return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
}
}
/**
* Start checks against all volumes of a dataset, invoking the
* given callback when the operation has completed. The function
* does not wait for the checks to complete.
*
* If a volume cannot be referenced then it is already closed and
* cannot be checked. No error is propagated to the callback for that
* volume.
*
* @param dataset - FsDatasetSpi to be checked.
* @param callback - Callback to be invoked when the checks are complete.
* @return true if the check was scheduled and the callback will be invoked.
* false if the check was not scheduled and the callback will not be
* invoked.
*/
public boolean checkAllVolumesAsync(
final FsDatasetSpi<? extends FsVolumeSpi> dataset,
Callback callback) {
if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
return false;
}
lastAllVolumesCheck = timer.monotonicNow();
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
final CountDownLatch latch = new CountDownLatch(references.size());
LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
// The context parameter is currently ignored.
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
Futures.addCallback(future, new ResultHandler(
reference, healthyVolumes, failedVolumes, latch, callback));
}
numAsyncDatasetChecks.incrementAndGet();
return true;
}
/**
* A callback interface that is supplied the result of running an
* async disk check on multiple volumes.
*/
public interface Callback {
/**
* @param healthyVolumes set of volumes that passed disk checks.
* @param failedVolumes set of volumes that failed disk checks.
*/
void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes);
}
/**
* Check a single volume, returning a {@link ListenableFuture}
* that can be used to retrieve the final result.
*
* If the volume cannot be referenced then it is already closed and
* cannot be checked. No error is propagated to the callback.
*
* @param volume the volume that is to be checked.
* @param callback callback to be invoked when the volume check completes.
*/
public void checkVolume(
final FsVolumeSpi volume,
Callback callback) {
FsVolumeReference volumeReference;
try {
volumeReference = volume.obtainReference();
} catch (ClosedChannelException e) {
// The volume has already been closed.
callback.call(new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>());
return;
}
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(volume, IGNORED_CONTEXT);
numVolumeChecks.incrementAndGet();
Futures.addCallback(future, new ResultHandler(
volumeReference, new HashSet<FsVolumeSpi>(),
new HashSet<FsVolumeSpi>(), new CountDownLatch(1), callback));
}
/**
* A callback to process the results of checking a volume.
*/
private class ResultHandler
implements FutureCallback<VolumeCheckResult> {
private final FsVolumeReference reference;
private final Set<FsVolumeSpi> failedVolumes;
private final Set<FsVolumeSpi> healthyVolumes;
private final CountDownLatch latch;
private final AtomicLong numVolumes;
@Nullable
private final Callback callback;
ResultHandler(FsVolumeReference reference,
Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes,
CountDownLatch latch,
@Nullable Callback callback) {
Preconditions.checkState(reference != null);
this.reference = reference;
this.healthyVolumes = healthyVolumes;
this.failedVolumes = failedVolumes;
this.latch = latch;
this.callback = callback;
numVolumes = new AtomicLong(latch.getCount());
}
@Override
public void onSuccess(@Nonnull VolumeCheckResult result) {
switch(result) {
case HEALTHY:
case DEGRADED:
LOG.debug("Volume {} is {}.", reference.getVolume(), result);
markHealthy();
break;
case FAILED:
LOG.warn("Volume {} detected as being unhealthy",
reference.getVolume());
markFailed();
break;
default:
LOG.error("Unexpected health check result {} for volume {}",
result, reference.getVolume());
markHealthy();
break;
}
cleanup();
}
@Override
public void onFailure(@Nonnull Throwable t) {
Throwable exception = (t instanceof ExecutionException) ?
t.getCause() : t;
LOG.warn("Exception running disk checks against volume " +
reference.getVolume(), exception);
markFailed();
cleanup();
}
private void markHealthy() {
synchronized (DatasetVolumeChecker.this) {
healthyVolumes.add(reference.getVolume());
}
}
private void markFailed() {
synchronized (DatasetVolumeChecker.this) {
failedVolumes.add(reference.getVolume());
}
}
private void cleanup() {
IOUtils.cleanup(null, reference);
invokeCallback();
}
private void invokeCallback() {
try {
latch.countDown();
if (numVolumes.decrementAndGet() == 0 &&
callback != null) {
callback.call(healthyVolumes, failedVolumes);
}
} catch(Exception e) {
// Propagating this exception is unlikely to be helpful.
LOG.warn("Unexpected exception", e);
}
}
}
/**
* Shutdown the checker and its associated ExecutorService.
*
* See {@link ExecutorService#awaitTermination} for the interpretation
* of the parameters.
*/
public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
try {
delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
} catch (InterruptedException e) {
LOG.warn("DatasetVolumeChecker interrupted during shutdown.");
Thread.currentThread().interrupt();
}
}
/**
* This method is for testing only.
*
* @param testDelegate
*/
@VisibleForTesting
void setDelegateChecker(
AsyncChecker<VolumeCheckContext, VolumeCheckResult> testDelegate) {
delegateChecker = testDelegate;
}
/**
* Return the number of {@link #checkVolume} invocations.
*/
public long getNumVolumeChecks() {
return numVolumeChecks.get();
}
/**
* Return the number of {@link #checkAllVolumes} invocations.
*/
public long getNumSyncDatasetChecks() {
return numSyncDatasetChecks.get();
}
/**
* Return the number of {@link #checkAllVolumesAsync} invocations.
*/
public long getNumAsyncDatasetChecks() {
return numAsyncDatasetChecks.get();
}
/**
* Return the number of checks skipped because the minimum gap since the
* last check had not elapsed.
*/
public long getNumSkippedChecks() {
return numSkippedChecks.get();
}
}

View File

@ -160,6 +160,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
return references.get(index).getVolume();
}
/**
* Get the reference for a given index.
*/
public FsVolumeReference getReference(int index) {
return references.get(index);
}
@Override
public void close() throws IOException {
IOException ioe = null;

View File

@ -24,11 +24,15 @@ import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
/**
* This is an interface for the underlying volume.
*/
public interface FsVolumeSpi {
public interface FsVolumeSpi
extends Checkable<FsVolumeSpi.VolumeCheckContext, VolumeCheckResult> {
/**
* Obtain a reference object that had increased 1 reference count of the
* volume.
@ -197,4 +201,10 @@ public interface FsVolumeSpi {
*/
byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
throws IOException;
/**
* Context for the {@link #check} call.
*/
class VolumeCheckContext {
}
}

View File

@ -46,21 +46,22 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
@ -859,7 +860,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@Override
public FsDatasetSpi getDataset() {
public FsDatasetSpi<? extends FsVolumeSpi> getDataset() {
return dataset;
}
@ -902,6 +903,16 @@ public class FsVolumeImpl implements FsVolumeSpi {
s.checkDirs();
}
}
@Override
public VolumeCheckResult check(VolumeCheckContext ignored)
throws DiskErrorException {
// TODO:FEDERATION valid synchronization
for(BlockPoolSlice s : bpSlices.values()) {
s.checkDirs();
}
return VolumeCheckResult.HEALTHY;
}
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap)

View File

@ -4186,11 +4186,11 @@
<name>dfs.datanode.disk.check.timeout</name>
<value>10m</value>
<description>
Maximum allowed time for a disk check to complete. If the check does
not complete within this time interval then the disk is declared as
failed. This setting supports multiple time unit suffixes as described
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
is assumed.
Maximum allowed time for a disk check to complete during DataNode
startup. If the check does not complete within this time interval
then the disk is declared as failed. This setting supports
multiple time unit suffixes as described in dfs.heartbeat.interval.
If no suffix is specified then milliseconds is assumed.
</description>
</property>

View File

@ -38,6 +38,7 @@ import javax.management.StandardMBean;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -529,6 +530,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
File blockFile, File metaFile) throws IOException {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
}
}
private final Map<String, Map<Block, BInfo>> blockMap

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -884,6 +885,12 @@ public class TestDirectoryScanner {
File blockFile, File metaFile) throws IOException {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

View File

@ -0,0 +1,259 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Tests for {@link DatasetVolumeChecker} when the {@link FsVolumeSpi#check}
* method returns different values of {@link VolumeCheckResult}.
*/
@RunWith(Parameterized.class)
public class TestDatasetVolumeChecker {
public static final Logger LOG =
LoggerFactory.getLogger(TestDatasetVolumeChecker.class);
@Rule
public TestName testName = new TestName();
/**
* Run each test case for each possible value of {@link VolumeCheckResult}.
* Including "null" for 'throw exception'.
* @return
*/
@Parameters(name="{0}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (VolumeCheckResult result : VolumeCheckResult.values()) {
values.add(new Object[] {result});
}
values.add(new Object[] {null});
return values;
}
/**
* When null, the check call should throw an exception.
*/
private final VolumeCheckResult expectedVolumeHealth;
private static final int NUM_VOLUMES = 2;
public TestDatasetVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
this.expectedVolumeHealth = expectedVolumeHealth;
}
/**
* Test {@link DatasetVolumeChecker#checkVolume} propagates the
* check to the delegate checker.
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testCheckOneVolume() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final FsVolumeSpi volume = makeVolumes(1, expectedVolumeHealth).get(0);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
/**
* Request a check and ensure it triggered {@link FsVolumeSpi#check}.
*/
checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
@Override
public void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
numCallbackInvocations.incrementAndGet();
if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
assertThat(healthyVolumes.size(), is(1));
assertThat(failedVolumes.size(), is(0));
} else {
assertThat(healthyVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(1));
}
}
});
// Ensure that the check was invoked at least once.
verify(volume, times(1)).check(any(VolumeCheckContext.class));
assertThat(numCallbackInvocations.get(), is(1L));
}
/**
* Test {@link DatasetVolumeChecker#checkAllVolumes} propagates
* checks for all volumes to the delegate checker.
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testCheckAllVolumes() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final List<FsVolumeSpi> volumes = makeVolumes(
NUM_VOLUMES, expectedVolumeHealth);
final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
assertThat(failedVolumes.size(), is(NUM_VOLUMES));
} else {
assertTrue(failedVolumes.isEmpty());
}
// Ensure each volume's check() method was called exactly once.
for (FsVolumeSpi volume : volumes) {
verify(volume, times(1)).check(any(VolumeCheckContext.class));
}
}
/**
* Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}.
*
* @throws Exception
*/
@Test(timeout=10000)
public void testCheckAllVolumesAsync() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final List<FsVolumeSpi> volumes = makeVolumes(
NUM_VOLUMES, expectedVolumeHealth);
final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
checker.checkAllVolumesAsync(
dataset, new DatasetVolumeChecker.Callback() {
@Override
public void call(
Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null ||
expectedVolumeHealth == FAILED) {
assertThat(healthyVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(NUM_VOLUMES));
} else {
assertThat(healthyVolumes.size(), is(NUM_VOLUMES));
assertThat(failedVolumes.size(), is(0));
}
numCallbackInvocations.incrementAndGet();
}
});
// The callback should be invoked exactly once.
assertThat(numCallbackInvocations.get(), is(1L));
// Ensure each volume's check() method was called exactly once.
for (FsVolumeSpi volume : volumes) {
verify(volume, times(1)).check(any(VolumeCheckContext.class));
}
}
/**
* A checker to wraps the result of {@link FsVolumeSpi#check} in
* an ImmediateFuture.
*/
static class DummyChecker
implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
@Override
public ListenableFuture<VolumeCheckResult> schedule(
Checkable<VolumeCheckContext, VolumeCheckResult> target,
VolumeCheckContext context) {
try {
return Futures.immediateFuture(target.check(context));
} catch (Exception e) {
LOG.info("check routine threw exception " + e);
return Futures.immediateFailedFuture(e);
}
}
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
// Nothing to cancel.
}
}
/**
* Create a dataset with the given volumes.
*/
static FsDatasetSpi<FsVolumeSpi> makeDataset(List<FsVolumeSpi> volumes)
throws Exception {
// Create dataset and init volume health.
final FsDatasetSpi<FsVolumeSpi> dataset = mock(FsDatasetSpi.class);
final FsDatasetSpi.FsVolumeReferences references = new
FsDatasetSpi.FsVolumeReferences(volumes);
when(dataset.getFsVolumeReferences()).thenReturn(references);
return dataset;
}
private static List<FsVolumeSpi> makeVolumes(
int numVolumes, VolumeCheckResult health) throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
for (int i = 0; i < numVolumes; ++i) {
final FsVolumeSpi volume = mock(FsVolumeSpi.class);
final FsVolumeReference reference = mock(FsVolumeReference.class);
when(reference.getVolume()).thenReturn(volume);
when(volume.obtainReference()).thenReturn(reference);
if (health != null) {
when(volume.check(any(VolumeCheckContext.class))).thenReturn(health);
} else {
final DiskErrorException de = new DiskErrorException("Fake Exception");
when(volume.check(any(VolumeCheckContext.class))).thenThrow(de);
}
volumes.add(volume);
}
return volumes;
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import java.util.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
/**
* Test a few more conditions not covered by TestDatasetVolumeChecker.
*/
public class TestDatasetVolumeCheckerFailures {
public static final Logger LOG =LoggerFactory.getLogger(
TestDatasetVolumeCheckerFailures.class);
/**
* Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
* @throws Exception
*/
@Test(timeout=60000)
public void testTimeout() throws Exception {
// Add a volume whose check routine hangs forever.
final List<FsVolumeSpi> volumes =
Collections.singletonList(makeHungVolume());
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(volumes);
// Create a disk checker with a very low timeout.
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
1, TimeUnit.SECONDS);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(conf, new FakeTimer());
// Ensure that the hung volume is detected as failed.
Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(1));
}
/**
* Test checking a closed volume i.e. one which cannot be referenced.
*
* @throws Exception
*/
@Test(timeout=60000)
public void testCheckingClosedVolume() throws Exception {
// Add a volume that cannot be referenced.
final List<FsVolumeSpi> volumes =
Collections.singletonList(makeClosedVolume());
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(volumes);
DatasetVolumeChecker checker =
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(0));
// The closed volume should not have been checked as it cannot
// be referenced.
verify(volumes.get(0), times(0)).check(any(VolumeCheckContext.class));
}
@Test(timeout=60000)
public void testMinGapIsEnforcedForSyncChecks() throws Exception {
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(new ArrayList<FsVolumeSpi>());
final FakeTimer timer = new FakeTimer();
final Configuration conf = new HdfsConfiguration();
final long minGapMs = 100;
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
minGapMs, TimeUnit.MILLISECONDS);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumes(dataset);
assertThat(checker.getNumSyncDatasetChecks(), is(1L));
// Re-check without advancing the timer. Ensure the check is skipped.
checker.checkAllVolumes(dataset);
assertThat(checker.getNumSyncDatasetChecks(), is(1L));
assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed.
timer.advance(minGapMs);
checker.checkAllVolumes(dataset);
assertThat(checker.getNumSyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L));
}
@Test(timeout=60000)
public void testMinGapIsEnforcedForASyncChecks() throws Exception {
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(new ArrayList<FsVolumeSpi>());
final FakeTimer timer = new FakeTimer();
final Configuration conf = new HdfsConfiguration();
final long minGapMs = 100;
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
minGapMs, TimeUnit.MILLISECONDS);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
// Re-check without advancing the timer. Ensure the check is skipped.
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed.
timer.advance(minGapMs);
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L));
}
/**
* Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
* hangs forever.
*
* @return volume
* @throws Exception
*/
private static FsVolumeSpi makeHungVolume() throws Exception {
final FsVolumeSpi volume = mock(FsVolumeSpi.class);
final FsVolumeReference reference = mock(FsVolumeReference.class);
when(reference.getVolume()).thenReturn(volume);
when(volume.obtainReference()).thenReturn(reference);
when(volume.check(any(VolumeCheckContext.class))).thenAnswer(
new Answer<VolumeCheckResult>() {
@Override
public VolumeCheckResult answer(InvocationOnMock invocation)
throws Throwable {
Thread.sleep(Long.MAX_VALUE); // Sleep forever.
return VolumeCheckResult.HEALTHY; // unreachable.
}
});
return volume;
}
/**
* Create a mock FsVolumeSpi which is closed and hence cannot
* be referenced.
*
* @return volume
* @throws Exception
*/
private static FsVolumeSpi makeClosedVolume() throws Exception {
final FsVolumeSpi volume = mock(FsVolumeSpi.class);
when(volume.obtainReference()).thenThrow(new ClosedChannelException());
return volume;
}
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -107,4 +108,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
public FsDatasetSpi getDataset() {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
}
}