HDFS-11119. Support for parallel checking of StorageLocations on DataNode startup.
This closes #155.
This commit is contained in:
parent
4484b48498
commit
3d26717777
|
@ -526,6 +526,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
|
||||
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
|
||||
|
||||
public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY =
|
||||
"dfs.datanode.disk.check.min.gap";
|
||||
public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT =
|
||||
"15m";
|
||||
|
||||
public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY =
|
||||
"dfs.datanode.disk.check.timeout";
|
||||
public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
|
||||
"10m";
|
||||
|
||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
|
||||
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
|
||||
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
|
||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
|
||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -48,8 +50,10 @@ import org.apache.hadoop.util.StringUtils;
|
|||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StorageLocation implements Comparable<StorageLocation>{
|
||||
final StorageType storageType;
|
||||
public class StorageLocation
|
||||
implements Checkable<StorageLocation.CheckContext, VolumeCheckResult>,
|
||||
Comparable<StorageLocation> {
|
||||
private final StorageType storageType;
|
||||
private final URI baseURI;
|
||||
/** Regular expression that describes a storage uri with a storage type.
|
||||
* e.g. [Disk]/storages/storage1/
|
||||
|
@ -206,4 +210,27 @@ public class StorageLocation implements Comparable<StorageLocation>{
|
|||
": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override // Checkable
|
||||
public VolumeCheckResult check(CheckContext context) throws IOException {
|
||||
DiskChecker.checkDir(
|
||||
context.localFileSystem,
|
||||
new Path(baseURI),
|
||||
context.expectedPermission);
|
||||
return VolumeCheckResult.HEALTHY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to hold the parameters for running a {@link #check}.
|
||||
*/
|
||||
public static final class CheckContext {
|
||||
private final LocalFileSystem localFileSystem;
|
||||
private final FsPermission expectedPermission;
|
||||
|
||||
public CheckContext(LocalFileSystem localFileSystem,
|
||||
FsPermission expectedPermission) {
|
||||
this.localFileSystem = localFileSystem;
|
||||
this.expectedPermission = expectedPermission;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.checker;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* A utility class that encapsulates checking storage locations during DataNode
|
||||
* startup.
|
||||
*
|
||||
* Some of this code was extracted from the DataNode class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class StorageLocationChecker {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
StorageLocationChecker.class);
|
||||
private final AsyncChecker<CheckContext, VolumeCheckResult> delegateChecker;
|
||||
private final Timer timer;
|
||||
|
||||
/**
|
||||
* Max allowed time for a disk check in milliseconds. If the check
|
||||
* doesn't complete within this time we declare the disk as dead.
|
||||
*/
|
||||
private final long maxAllowedTimeForCheckMs;
|
||||
|
||||
|
||||
/**
|
||||
* Expected filesystem permissions on the storage directory.
|
||||
*/
|
||||
private final FsPermission expectedPermission;
|
||||
|
||||
/**
|
||||
* Maximum number of volume failures that can be tolerated without
|
||||
* declaring a fatal error.
|
||||
*/
|
||||
private final int maxVolumeFailuresTolerated;
|
||||
|
||||
public StorageLocationChecker(Configuration conf, Timer timer) {
|
||||
maxAllowedTimeForCheckMs = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
expectedPermission = new FsPermission(
|
||||
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
||||
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
|
||||
|
||||
maxVolumeFailuresTolerated = conf.getInt(
|
||||
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
||||
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
|
||||
|
||||
this.timer = timer;
|
||||
|
||||
delegateChecker = new ThrottledAsyncChecker<>(
|
||||
timer,
|
||||
conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("StorageLocationChecker thread %d")
|
||||
.setDaemon(true)
|
||||
.build()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate a check of the supplied storage volumes and return
|
||||
* a list of failed volumes.
|
||||
*
|
||||
* @param conf HDFS configuration.
|
||||
* @param dataDirs list of volumes to check.
|
||||
* @return returns a list of failed volumes. Returns the empty list if
|
||||
* there are no failed volumes.
|
||||
*
|
||||
* @throws InterruptedException if the check was interrupted.
|
||||
* @throws IOException if the number of failed volumes exceeds the
|
||||
* maximum allowed or if there are no good
|
||||
* volumes.
|
||||
*/
|
||||
public List<StorageLocation> check(
|
||||
final Configuration conf,
|
||||
final Collection<StorageLocation> dataDirs)
|
||||
throws InterruptedException, IOException {
|
||||
|
||||
final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
|
||||
final Set<StorageLocation> failedLocations = new HashSet<>();
|
||||
final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
|
||||
Maps.newHashMap();
|
||||
final LocalFileSystem localFS = FileSystem.getLocal(conf);
|
||||
final CheckContext context = new CheckContext(localFS, expectedPermission);
|
||||
|
||||
// Start parallel disk check operations on all StorageLocations.
|
||||
for (StorageLocation location : dataDirs) {
|
||||
futures.put(location,
|
||||
delegateChecker.schedule(location, context));
|
||||
}
|
||||
|
||||
final long checkStartTimeMs = timer.monotonicNow();
|
||||
|
||||
// Retrieve the results of the disk checks.
|
||||
for (Map.Entry<StorageLocation,
|
||||
ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {
|
||||
|
||||
// Determine how much time we can allow for this check to complete.
|
||||
// The cumulative wait time cannot exceed maxAllowedTimeForCheck.
|
||||
final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
|
||||
final long timeLeftMs = Math.max(0,
|
||||
maxAllowedTimeForCheckMs - waitSoFarMs);
|
||||
final StorageLocation location = entry.getKey();
|
||||
|
||||
try {
|
||||
final VolumeCheckResult result =
|
||||
entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
|
||||
switch (result) {
|
||||
case HEALTHY:
|
||||
goodLocations.add(entry.getKey());
|
||||
break;
|
||||
case DEGRADED:
|
||||
LOG.warn("StorageLocation {} appears to be degraded.", location);
|
||||
break;
|
||||
case FAILED:
|
||||
LOG.warn("StorageLocation {} detected as failed.", location);
|
||||
failedLocations.add(location);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unexpected health check result {} for StorageLocation {}",
|
||||
result, location);
|
||||
goodLocations.add(entry.getKey());
|
||||
}
|
||||
} catch (ExecutionException|TimeoutException e) {
|
||||
LOG.warn("Exception checking StorageLocation " + location,
|
||||
e.getCause());
|
||||
failedLocations.add(location);
|
||||
}
|
||||
}
|
||||
|
||||
if (failedLocations.size() > maxVolumeFailuresTolerated) {
|
||||
throw new IOException(
|
||||
"Too many failed volumes: " + failedLocations.size() +
|
||||
". The configuration allows for a maximum of " +
|
||||
maxVolumeFailuresTolerated + " failed volumes.");
|
||||
}
|
||||
|
||||
if (goodLocations.size() == 0) {
|
||||
throw new IOException("All directories in "
|
||||
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
|
||||
+ failedLocations);
|
||||
}
|
||||
|
||||
return goodLocations;
|
||||
}
|
||||
|
||||
public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
|
||||
try {
|
||||
delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("StorageLocationChecker interrupted during shutdown.");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.checker;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Defines the outcomes of running a disk check operation against a
|
||||
* volume.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public enum VolumeCheckResult {
|
||||
HEALTHY(1),
|
||||
DEGRADED(2),
|
||||
FAILED(3);
|
||||
|
||||
private final int value;
|
||||
|
||||
VolumeCheckResult(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
int getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
|
@ -4324,4 +4324,27 @@
|
|||
call queue</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.disk.check.min.gap</name>
|
||||
<value>15m</value>
|
||||
<description>
|
||||
The minimum gap between two successive checks of the same DataNode
|
||||
volume. This setting supports multiple time unit suffixes as described
|
||||
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
|
||||
is assumed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.disk.check.timeout</name>
|
||||
<value>10m</value>
|
||||
<description>
|
||||
Maximum allowed time for a disk check to complete. If the check does
|
||||
not complete within this time interval then the disk is declared as
|
||||
failed. This setting supports multiple time unit suffixes as described
|
||||
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
|
||||
is assumed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.checker;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Unit tests for the {@link StorageLocationChecker} class.
|
||||
*/
|
||||
public class TestStorageLocationChecker {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestStorageLocationChecker.class);
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
/**
|
||||
* Verify that all healthy locations are correctly handled and that the
|
||||
* check routine is invoked as expected.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testAllLocationsHealthy() throws Exception {
|
||||
final List<StorageLocation> locations =
|
||||
makeMockLocations(HEALTHY, HEALTHY, HEALTHY);
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
|
||||
StorageLocationChecker checker =
|
||||
new StorageLocationChecker(conf, new FakeTimer());
|
||||
List<StorageLocation> filteredLocations = checker.check(conf, locations);
|
||||
|
||||
// All locations should be healthy.
|
||||
assertThat(filteredLocations.size(), is(3));
|
||||
|
||||
// Ensure that the check method was invoked for each location.
|
||||
for (StorageLocation location : locations) {
|
||||
verify(location).check(any(StorageLocation.CheckContext.class));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test handling when the number of failed locations is below the
|
||||
* max volume failure threshold.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testFailedLocationsBelowThreshold() throws Exception {
|
||||
final List<StorageLocation> locations =
|
||||
makeMockLocations(HEALTHY, HEALTHY, FAILED); // 2 healthy, 1 failed.
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
StorageLocationChecker checker =
|
||||
new StorageLocationChecker(conf, new FakeTimer());
|
||||
List<StorageLocation> filteredLocations = checker.check(conf, locations);
|
||||
assertThat(filteredLocations.size(), is(2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test handling when the number of failed locations is above the
|
||||
* max volume failure threshold.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testFailedLocationsAboveThreshold() throws Exception {
|
||||
final List<StorageLocation> locations =
|
||||
makeMockLocations(HEALTHY, FAILED, FAILED); // 1 healthy, 2 failed.
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.expectMessage("Too many failed volumes");
|
||||
StorageLocationChecker checker =
|
||||
new StorageLocationChecker(conf, new FakeTimer());
|
||||
checker.check(conf, locations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test handling all storage locations are failed.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testAllFailedLocations() throws Exception {
|
||||
final List<StorageLocation> locations =
|
||||
makeMockLocations(FAILED, FAILED, FAILED);
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 3);
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.expectMessage("All directories in " + DFS_DATANODE_DATA_DIR_KEY +
|
||||
" are invalid");
|
||||
StorageLocationChecker checker =
|
||||
new StorageLocationChecker(conf, new FakeTimer());
|
||||
checker.check(conf, locations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a {@link StorageLocation#check} timeout is correctly detected
|
||||
* as a failure.
|
||||
*
|
||||
* This is hard to test without a {@link Thread#sleep} call.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
public void testTimeoutInCheck() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setTimeDuration(DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
|
||||
1, TimeUnit.SECONDS);
|
||||
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
final FakeTimer timer = new FakeTimer();
|
||||
|
||||
// Generate a list of storage locations the first of which sleeps
|
||||
// for 2 seconds in its check() routine.
|
||||
final List<StorageLocation> locations = makeSlowLocations(2000, 1);
|
||||
StorageLocationChecker checker =
|
||||
new StorageLocationChecker(conf, timer);
|
||||
|
||||
try {
|
||||
// Check the two locations and ensure that only one of them
|
||||
// was filtered out.
|
||||
List<StorageLocation> filteredList = checker.check(conf, locations);
|
||||
assertThat(filteredList.size(), is(1));
|
||||
} finally {
|
||||
checker.shutdownAndWait(10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of storage locations - one per argument - which return
|
||||
* health check results corresponding to the supplied arguments.
|
||||
*/
|
||||
private List<StorageLocation> makeMockLocations(VolumeCheckResult... args)
|
||||
throws IOException {
|
||||
final List<StorageLocation> locations = new ArrayList<>(args.length);
|
||||
final AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
for (VolumeCheckResult result : args) {
|
||||
final StorageLocation location = mock(StorageLocation.class);
|
||||
when(location.toString()).thenReturn("/" + index.incrementAndGet());
|
||||
when(location.check(any(StorageLocation.CheckContext.class)))
|
||||
.thenReturn(result);
|
||||
locations.add(location);
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of storage locations - one per argument - whose check()
|
||||
* method takes at least the specified number of milliseconds to complete.
|
||||
*/
|
||||
private List<StorageLocation> makeSlowLocations(long... args)
|
||||
throws IOException {
|
||||
final List<StorageLocation> locations = new ArrayList<>(args.length);
|
||||
final AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
for (final long checkDelayMs: args) {
|
||||
final StorageLocation location = mock(StorageLocation.class);
|
||||
when(location.toString()).thenReturn("/" + index.incrementAndGet());
|
||||
when(location.check(any(StorageLocation.CheckContext.class)))
|
||||
.thenAnswer(new Answer<VolumeCheckResult>() {
|
||||
@Override
|
||||
public VolumeCheckResult answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
Thread.sleep(checkDelayMs);
|
||||
return VolumeCheckResult.HEALTHY;
|
||||
}
|
||||
});
|
||||
|
||||
locations.add(location);
|
||||
}
|
||||
return locations;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue