HDFS-11114. Support for running async disk checks in DataNode.

This closes #153.
This commit is contained in:
Arpit Agarwal 2016-11-07 18:45:53 -08:00
parent 3dbad5d823
commit 3fff158587
5 changed files with 638 additions and 0 deletions

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* A class that can be used to schedule an asynchronous check on a given
* {@link Checkable}. If the check is successfully scheduled then a
* {@link ListenableFuture} is returned.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AsyncChecker<K, V> {
/**
* Schedule an asynchronous check for the given object.
*
* @param target object to be checked.
*
* @param context the interpretation of the context depends on the
* target.
*
* @return returns a {@link ListenableFuture} that can be used to
* retrieve the result of the asynchronous check.
*/
ListenableFuture<V> schedule(Checkable<K, V> target, K context);
/**
* Cancel all executing checks and wait for them to complete.
* First attempts a graceful cancellation, then cancels forcefully.
* Waits for the supplied timeout after both attempts.
*
* See {@link ExecutorService#awaitTermination} for a description of
* the parameters.
*
* @throws InterruptedException
*/
void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException;
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A Checkable is an object whose health can be probed by invoking its
* {@link #check} method.
*
* e.g. a {@link Checkable} instance may represent a single hardware
* resource.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface Checkable<K, V> {
/**
* Query the health of this object. This method may hang
* indefinitely depending on the status of the target resource.
*
* @param context for the probe operation. May be null depending
* on the implementation.
*
* @return result of the check operation.
*
* @throws Exception encountered during the check operation. An
* exception indicates that the check failed.
*/
V check(K context) throws Exception;
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* An implementation of {@link AsyncChecker} that skips checking recently
* checked objects. It will enforce at least {@link minMsBetweenChecks}
* milliseconds between two successive checks of any one object.
*
* It is assumed that the total number of Checkable objects in the system
* is small, (not more than a few dozen) since the checker uses O(Checkables)
* storage and also potentially O(Checkables) threads.
*
* {@link minMsBetweenChecks} should be configured reasonably
* by the caller to avoid spinning up too many threads frequently.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
public static final Logger LOG =
LoggerFactory.getLogger(ThrottledAsyncChecker.class);
private final Timer timer;
/**
* The ExecutorService used to schedule asynchronous checks.
*/
private final ListeningExecutorService executorService;
/**
* The minimum gap in milliseconds between two successive checks
* of the same object. This is the throttle.
*/
private final long minMsBetweenChecks;
/**
* Map of checks that are currently in progress. Protected by the object
* lock.
*/
private final Map<Checkable, ListenableFuture<V>> checksInProgress;
/**
* Maps Checkable objects to a future that can be used to retrieve
* the results of the operation.
* Protected by the object lock.
*/
private final Map<Checkable, LastCheckResult<V>> completedChecks;
ThrottledAsyncChecker(final Timer timer,
final long minMsBetweenChecks,
final ExecutorService executorService) {
this.timer = timer;
this.minMsBetweenChecks = minMsBetweenChecks;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.checksInProgress = new HashMap<>();
this.completedChecks = new WeakHashMap<>();
}
/**
* See {@link AsyncChecker#schedule}
*
* If the object has been checked recently then the check will
* be skipped. Multiple concurrent checks for the same object
* will receive the same Future.
*/
@Override
public synchronized ListenableFuture<V> schedule(
final Checkable<K, V> target,
final K context) {
LOG.debug("Scheduling a check of {}", target);
if (checksInProgress.containsKey(target)) {
return checksInProgress.get(target);
}
if (completedChecks.containsKey(target)) {
final LastCheckResult<V> result = completedChecks.get(target);
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
if (msSinceLastCheck < minMsBetweenChecks) {
LOG.debug("Skipped checking {}. Time since last check {}ms " +
"is less than the min gap {}ms.",
target, msSinceLastCheck, minMsBetweenChecks);
return result.result != null ?
Futures.immediateFuture(result.result) :
Futures.immediateFailedFuture(result.exception);
}
}
final ListenableFuture<V> lf = executorService.submit(
new Callable<V>() {
@Override
public V call() throws Exception {
return target.check(context);
}
});
checksInProgress.put(target, lf);
addResultCachingCallback(target, lf);
return lf;
}
/**
* Register a callback to cache the result of a check.
* @param target
* @param lf
*/
private void addResultCachingCallback(
Checkable<K, V> target, ListenableFuture<V> lf) {
Futures.addCallback(lf, new FutureCallback<V>() {
@Override
public void onSuccess(@Nullable V result) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
result, timer.monotonicNow()));
}
}
@Override
public void onFailure(@Nonnull Throwable t) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
t, timer.monotonicNow()));
}
}
});
}
/**
* {@inheritDoc}.
*/
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
// Try orderly shutdown.
executorService.shutdown();
if (!executorService.awaitTermination(timeout, timeUnit)) {
// Interrupt executing tasks and wait again.
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
}
/**
* Status of running a check. It can either be a result or an
* exception, depending on whether the check completed or threw.
*/
private static final class LastCheckResult<V> {
/**
* Timestamp at which the check completed.
*/
private final long completedAt;
/**
* Result of running the check if it completed. null if it threw.
*/
@Nullable
private final V result;
/**
* Exception thrown by the check. null if it returned a result.
*/
private final Throwable exception; // null on success.
/**
* Initialize with a result.
* @param result
*/
private LastCheckResult(V result, long completedAt) {
this.result = result;
this.exception = null;
this.completedAt = completedAt;
}
/**
* Initialize with an exception.
* @param completedAt
* @param t
*/
private LastCheckResult(Throwable t, long completedAt) {
this.result = null;
this.exception = t;
this.completedAt = completedAt;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Datanode support for running disk checks.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,276 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Verify functionality of {@link ThrottledAsyncChecker}.
*/
public class TestThrottledAsyncChecker {
public static final Logger LOG =
LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
private static final long MIN_ERROR_CHECK_GAP = 1000;
@Rule
public ExpectedException thrown = ExpectedException.none();
/**
* Test various scheduling combinations to ensure scheduling and
* throttling behave as expected.
*/
@Test(timeout=60000)
public void testScheduler() throws Exception {
final NoOpCheckable target1 = new NoOpCheckable();
final NoOpCheckable target2 = new NoOpCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
// check target1 and ensure we get back the expected result.
assertTrue(checker.schedule(target1, true).get());
assertThat(target1.numChecks.get(), is(1L));
// Check target1 again without advancing the timer. target1 should not
// be checked again and the cached result should be returned.
assertTrue(checker.schedule(target1, true).get());
assertThat(target1.numChecks.get(), is(1L));
// Schedule target2 scheduled without advancing the timer.
// target2 should be checked as it has never been checked before.
assertTrue(checker.schedule(target2, true).get());
assertThat(target2.numChecks.get(), is(1L));
// Advance the timer but just short of the min gap.
// Neither target1 nor target2 should be checked again.
timer.advance(MIN_ERROR_CHECK_GAP - 1);
assertTrue(checker.schedule(target1, true).get());
assertThat(target1.numChecks.get(), is(1L));
assertTrue(checker.schedule(target2, true).get());
assertThat(target2.numChecks.get(), is(1L));
// Advance the timer again.
// Both targets should be checked now.
timer.advance(MIN_ERROR_CHECK_GAP);
assertTrue(checker.schedule(target1, true).get());
assertThat(target1.numChecks.get(), is(2L));
assertTrue(checker.schedule(target2, true).get());
assertThat(target1.numChecks.get(), is(2L));
}
@Test (timeout=60000)
public void testCancellation() throws Exception {
LatchedCheckable target = new LatchedCheckable();
final FakeTimer timer = new FakeTimer();
final LatchedCallback callback = new LatchedCallback(target);
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
ListenableFuture<Boolean> lf = checker.schedule(target, true);
Futures.addCallback(lf, callback);
// Request immediate cancellation.
checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
try {
assertFalse(lf.get());
fail("Failed to get expected InterruptedException");
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof InterruptedException);
}
callback.failureLatch.await();
}
@Test (timeout=60000)
public void testConcurrentChecks() throws Exception {
LatchedCheckable target = new LatchedCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
// Ensure that concurrent requests return the same future object.
assertTrue(lf1 == lf2);
// Unblock the latch and wait for it to finish execution.
target.latch.countDown();
lf1.get();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// We should not get back the same future as before.
// This can take a short while until the internal callback in
// ThrottledAsyncChecker is scheduled for execution.
// Also this should not trigger a new check operation as the timer
// was not advanced. If it does trigger a new check then the test
// will fail with a timeout.
final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
return lf3 != lf2;
}
}, 100, 10000);
}
/**
* Ensure that the context is passed through to the Checkable#check
* method.
* @throws Exception
*/
@Test(timeout=60000)
public void testContextIsPassed() throws Exception {
final NoOpCheckable target1 = new NoOpCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
assertTrue(checker.schedule(target1, true).get());
assertThat(target1.numChecks.get(), is(1L));
timer.advance(MIN_ERROR_CHECK_GAP + 1);
assertFalse(checker.schedule(target1, false).get());
assertThat(target1.numChecks.get(), is(2L));
}
/**
* Ensure that the exeption from a failed check is cached
* and returned without re-running the check when the minimum
* gap has not elapsed.
*
* @throws Exception
*/
@Test(timeout=60000)
public void testExceptionCaching() throws Exception {
final ThrowingCheckable target1 = new ThrowingCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
thrown.expectCause(isA(DummyException.class));
checker.schedule(target1, true).get();
assertThat(target1.numChecks.get(), is(1L));
thrown.expectCause(isA(DummyException.class));
checker.schedule(target1, true).get();
assertThat(target1.numChecks.get(), is(2L));
}
/**
* A simple ExecutorService for testing.
*/
private ExecutorService getExecutorService() {
return new ScheduledThreadPoolExecutor(1);
}
/**
* A Checkable that just returns its input.
*/
private static class NoOpCheckable
implements Checkable<Boolean, Boolean> {
private final AtomicLong numChecks = new AtomicLong(0);
@Override
public Boolean check(Boolean context) {
numChecks.incrementAndGet();
return context;
}
}
private static class ThrowingCheckable
implements Checkable<Boolean, Boolean> {
private final AtomicLong numChecks = new AtomicLong(0);
@Override
public Boolean check(Boolean context) throws DummyException {
numChecks.incrementAndGet();
throw new DummyException();
}
}
private static class DummyException extends Exception {
}
/**
* A checkable that hangs until signaled.
*/
private static class LatchedCheckable
implements Checkable<Boolean, Boolean> {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public Boolean check(Boolean ignored) throws InterruptedException {
LOG.info("LatchedCheckable {} waiting.", this);
latch.await();
return true; // Unreachable.
}
}
/**
* A {@link FutureCallback} that counts its invocations.
*/
private static final class LatchedCallback
implements FutureCallback<Boolean> {
private final CountDownLatch successLatch = new CountDownLatch(1);
private final CountDownLatch failureLatch = new CountDownLatch(1);
private final Checkable target;
private LatchedCallback(Checkable target) {
this.target = target;
}
@Override
public void onSuccess(@Nonnull Boolean result) {
LOG.info("onSuccess callback invoked for {}", target);
successLatch.countDown();
}
@Override
public void onFailure(@Nonnull Throwable t) {
LOG.info("onFailure callback invoked for {} with exception", target, t);
failureLatch.countDown();
}
}
}