HDFS-11119. Support for parallel checking of StorageLocations on DataNode startup.

Change-Id: Iddedbeefd056af165be3557ccad199fe2878b591
This commit is contained in:
Arpit Agarwal 2016-11-11 15:02:52 -08:00
parent d1aa844dc6
commit 76f1ab524c
6 changed files with 537 additions and 1 deletions

View File

@ -500,6 +500,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY =
"dfs.datanode.disk.check.min.gap";
public static final long DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT =
900000; // 15 minutes.
public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY =
"dfs.datanode.disk.check.timeout";
public static final long DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
600000; // 10 minutes.
public static final String public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; "dfs.datanode.directoryscan.throttle.limit.ms.per.sec";

View File

@ -26,8 +26,18 @@
import java.util.regex.Matcher; import java.util.regex.Matcher;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -37,7 +47,8 @@
* *
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StorageLocation { public class StorageLocation
implements Checkable<StorageLocation.CheckContext, VolumeCheckResult> {
final StorageType storageType; final StorageType storageType;
final File file; final File file;
@ -116,4 +127,27 @@ public boolean equals(Object obj) {
public int hashCode() { public int hashCode() {
return toString().hashCode(); return toString().hashCode();
} }
@Override // Checkable
public VolumeCheckResult check(CheckContext context) throws IOException {
DiskChecker.checkDir(
context.localFileSystem,
new Path(file.toURI()),
context.expectedPermission);
return VolumeCheckResult.HEALTHY;
}
/**
* Class to hold the parameters for running a {@link #check}.
*/
public static final class CheckContext {
private final LocalFileSystem localFileSystem;
private final FsPermission expectedPermission;
public CheckContext(LocalFileSystem localFileSystem,
FsPermission expectedPermission) {
this.localFileSystem = localFileSystem;
this.expectedPermission = expectedPermission;
}
}
} }

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A utility class that encapsulates checking storage locations during DataNode
* startup.
*
* Some of this code was extracted from the DataNode class.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class StorageLocationChecker {
public static final Logger LOG = LoggerFactory.getLogger(
StorageLocationChecker.class);
private final AsyncChecker<CheckContext, VolumeCheckResult> delegateChecker;
private final Timer timer;
/**
* Max allowed time for a disk check in milliseconds. If the check
* doesn't complete within this time we declare the disk as dead.
*/
private final long maxAllowedTimeForCheckMs;
/**
* Expected filesystem permissions on the storage directory.
*/
private final FsPermission expectedPermission;
/**
* Maximum number of volume failures that can be tolerated without
* declaring a fatal error.
*/
private final int maxVolumeFailuresTolerated;
public StorageLocationChecker(Configuration conf, Timer timer) {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
expectedPermission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
maxVolumeFailuresTolerated = conf.getInt(
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
this.timer = timer;
delegateChecker = new ThrottledAsyncChecker<>(
timer,
conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("StorageLocationChecker thread %d")
.setDaemon(true)
.build()));
}
/**
* Initiate a check of the supplied storage volumes and return
* a list of failed volumes.
*
* @param conf HDFS configuration.
* @param dataDirs list of volumes to check.
* @return returns a list of failed volumes. Returns the empty list if
* there are no failed volumes.
*
* @throws InterruptedException if the check was interrupted.
* @throws IOException if the number of failed volumes exceeds the
* maximum allowed or if there are no good
* volumes.
*/
public List<StorageLocation> check(
final Configuration conf,
final Collection<StorageLocation> dataDirs)
throws InterruptedException, IOException {
final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
final Set<StorageLocation> failedLocations = new HashSet<>();
final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
Maps.newHashMap();
final LocalFileSystem localFS = FileSystem.getLocal(conf);
final CheckContext context = new CheckContext(localFS, expectedPermission);
// Start parallel disk check operations on all StorageLocations.
for (StorageLocation location : dataDirs) {
futures.put(location,
delegateChecker.schedule(location, context));
}
final long checkStartTimeMs = timer.monotonicNow();
// Retrieve the results of the disk checks.
for (Map.Entry<StorageLocation,
ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {
// Determine how much time we can allow for this check to complete.
// The cumulative wait time cannot exceed maxAllowedTimeForCheck.
final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
final long timeLeftMs = Math.max(0,
maxAllowedTimeForCheckMs - waitSoFarMs);
final StorageLocation location = entry.getKey();
try {
final VolumeCheckResult result =
entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
switch (result) {
case HEALTHY:
goodLocations.add(entry.getKey());
break;
case DEGRADED:
LOG.warn("StorageLocation {} appears to be degraded.", location);
break;
case FAILED:
LOG.warn("StorageLocation {} detected as failed.", location);
failedLocations.add(location);
break;
default:
LOG.error("Unexpected health check result {} for StorageLocation {}",
result, location);
goodLocations.add(entry.getKey());
}
} catch (ExecutionException|TimeoutException e) {
LOG.warn("Exception checking StorageLocation " + location,
e.getCause());
failedLocations.add(location);
}
}
if (failedLocations.size() > maxVolumeFailuresTolerated) {
throw new IOException(
"Too many failed volumes: " + failedLocations.size() +
". The configuration allows for a maximum of " +
maxVolumeFailuresTolerated + " failed volumes.");
}
if (goodLocations.size() == 0) {
throw new IOException("All directories in "
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+ failedLocations);
}
return goodLocations;
}
public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
try {
delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
} catch (InterruptedException e) {
LOG.warn("StorageLocationChecker interrupted during shutdown.");
Thread.currentThread().interrupt();
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Defines the outcomes of running a disk check operation against a
* volume.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public enum VolumeCheckResult {
HEALTHY(1),
DEGRADED(2),
FAILED(3);
private final int value;
VolumeCheckResult(int value) {
this.value = value;
}
int getValue() {
return value;
}
}

View File

@ -4170,4 +4170,28 @@
The size buffer to be used when creating or opening httpfs filesystem IO stream. The size buffer to be used when creating or opening httpfs filesystem IO stream.
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.disk.check.min.gap</name>
<value>15m</value>
<description>
The minimum gap between two successive checks of the same DataNode
volume. This setting supports multiple time unit suffixes as described
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
is assumed.
</description>
</property>
<property>
<name>dfs.datanode.disk.check.timeout</name>
<value>10m</value>
<description>
Maximum allowed time for a disk check to complete. If the check does
not complete within this time interval then the disk is declared as
failed. This setting supports multiple time unit suffixes as described
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
is assumed.
</description>
</property>
</configuration> </configuration>

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*;
/**
* Unit tests for the {@link StorageLocationChecker} class.
*/
public class TestStorageLocationChecker {
public static final Logger LOG = LoggerFactory.getLogger(
TestStorageLocationChecker.class);
@Rule
public ExpectedException thrown = ExpectedException.none();
/**
* Verify that all healthy locations are correctly handled and that the
* check routine is invoked as expected.
* @throws Exception
*/
@Test(timeout=30000)
public void testAllLocationsHealthy() throws Exception {
final List<StorageLocation> locations =
makeMockLocations(HEALTHY, HEALTHY, HEALTHY);
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
StorageLocationChecker checker =
new StorageLocationChecker(conf, new FakeTimer());
List<StorageLocation> filteredLocations = checker.check(conf, locations);
// All locations should be healthy.
assertThat(filteredLocations.size(), is(3));
// Ensure that the check method was invoked for each location.
for (StorageLocation location : locations) {
verify(location).check(any(StorageLocation.CheckContext.class));
}
}
/**
* Test handling when the number of failed locations is below the
* max volume failure threshold.
*
* @throws Exception
*/
@Test(timeout=30000)
public void testFailedLocationsBelowThreshold() throws Exception {
final List<StorageLocation> locations =
makeMockLocations(HEALTHY, HEALTHY, FAILED); // 2 healthy, 1 failed.
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
StorageLocationChecker checker =
new StorageLocationChecker(conf, new FakeTimer());
List<StorageLocation> filteredLocations = checker.check(conf, locations);
assertThat(filteredLocations.size(), is(2));
}
/**
* Test handling when the number of failed locations is above the
* max volume failure threshold.
*
* @throws Exception
*/
@Test(timeout=30000)
public void testFailedLocationsAboveThreshold() throws Exception {
final List<StorageLocation> locations =
makeMockLocations(HEALTHY, FAILED, FAILED); // 1 healthy, 2 failed.
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
thrown.expect(IOException.class);
thrown.expectMessage("Too many failed volumes");
StorageLocationChecker checker =
new StorageLocationChecker(conf, new FakeTimer());
checker.check(conf, locations);
}
/**
* Test handling all storage locations are failed.
*
* @throws Exception
*/
@Test(timeout=30000)
public void testAllFailedLocations() throws Exception {
final List<StorageLocation> locations =
makeMockLocations(FAILED, FAILED, FAILED);
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 3);
thrown.expect(IOException.class);
thrown.expectMessage("All directories in " + DFS_DATANODE_DATA_DIR_KEY +
" are invalid");
StorageLocationChecker checker =
new StorageLocationChecker(conf, new FakeTimer());
checker.check(conf, locations);
}
/**
* Verify that a {@link StorageLocation#check} timeout is correctly detected
* as a failure.
*
* This is hard to test without a {@link Thread#sleep} call.
*
* @throws Exception
*/
@Test (timeout=300000)
public void testTimeoutInCheck() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setTimeDuration(DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
1, TimeUnit.SECONDS);
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
final FakeTimer timer = new FakeTimer();
// Generate a list of storage locations the first of which sleeps
// for 2 seconds in its check() routine.
final List<StorageLocation> locations = makeSlowLocations(2000, 1);
StorageLocationChecker checker =
new StorageLocationChecker(conf, timer);
try {
// Check the two locations and ensure that only one of them
// was filtered out.
List<StorageLocation> filteredList = checker.check(conf, locations);
assertThat(filteredList.size(), is(1));
} finally {
checker.shutdownAndWait(10, TimeUnit.SECONDS);
}
}
/**
* Return a list of storage locations - one per argument - which return
* health check results corresponding to the supplied arguments.
*/
private List<StorageLocation> makeMockLocations(VolumeCheckResult... args)
throws IOException {
final List<StorageLocation> locations = new ArrayList<>(args.length);
final AtomicInteger index = new AtomicInteger(0);
for (VolumeCheckResult result : args) {
final StorageLocation location = mock(StorageLocation.class);
when(location.toString()).thenReturn("/" + index.incrementAndGet());
when(location.check(any(StorageLocation.CheckContext.class)))
.thenReturn(result);
locations.add(location);
}
return locations;
}
/**
* Return a list of storage locations - one per argument - whose check()
* method takes at least the specified number of milliseconds to complete.
*/
private List<StorageLocation> makeSlowLocations(long... args)
throws IOException {
final List<StorageLocation> locations = new ArrayList<>(args.length);
final AtomicInteger index = new AtomicInteger(0);
for (final long checkDelayMs: args) {
final StorageLocation location = mock(StorageLocation.class);
when(location.toString()).thenReturn("/" + index.incrementAndGet());
when(location.check(any(StorageLocation.CheckContext.class)))
.thenAnswer(new Answer<VolumeCheckResult>() {
@Override
public VolumeCheckResult answer(InvocationOnMock invocation)
throws Throwable {
Thread.sleep(checkDelayMs);
return VolumeCheckResult.HEALTHY;
}
});
locations.add(location);
}
return locations;
}
}