HDFS-11148. Update DataNode to use StorageLocationChecker at startup.

This commit is contained in:
Arpit Agarwal 2016-11-22 10:50:25 -08:00
parent afcf8d38e7
commit 613b902b98
9 changed files with 79 additions and 120 deletions

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
@ -67,7 +65,6 @@ import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.security.PrivilegedExceptionAction;
@ -92,6 +89,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.management.ObjectName;
import javax.net.SocketFactory;
@ -103,16 +101,14 @@ import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -203,7 +199,6 @@ import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.tracing.TracerConfigurationManager;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.InvalidChecksumSizeException;
@ -211,6 +206,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.VersionInfo;
import org.apache.htrace.core.Tracer;
import org.eclipse.jetty.util.ajax.JSON;
@ -389,6 +385,9 @@ public class DataNode extends ReconfigurableBase
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
@Nullable
private final StorageLocationChecker storageLocationChecker;
private final SocketFactory socketFactory;
@ -423,6 +422,7 @@ public class DataNode extends ReconfigurableBase
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout();
storageLocationChecker = null;
}
/**
@ -431,6 +431,7 @@ public class DataNode extends ReconfigurableBase
*/
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final StorageLocationChecker storageLocationChecker,
final SecureResources resources) throws IOException {
super(conf);
this.tracer = createTracer(conf);
@ -506,6 +507,7 @@ public class DataNode extends ReconfigurableBase
});
initOOBTimeout();
this.storageLocationChecker = storageLocationChecker;
}
@Override // ReconfigurableBase
@ -1935,6 +1937,10 @@ public class DataNode extends ReconfigurableBase
}
}
if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
}
if (pauseMonitor != null) {
pauseMonitor.stop();
}
@ -2620,21 +2626,6 @@ public class DataNode extends ReconfigurableBase
}
}
// Small wrapper around the DiskChecker class that provides means to mock
// DiskChecker static methods and unittest DataNode#getDataDirsFromURIs.
static class DataNodeDiskChecker {
private final FsPermission expectedPermission;
public DataNodeDiskChecker(FsPermission expectedPermission) {
this.expectedPermission = expectedPermission;
}
public void checkDir(LocalFileSystem localFS, Path path)
throws DiskErrorException, IOException {
DiskChecker.checkDir(localFS, path, expectedPermission);
}
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
@ -2649,44 +2640,18 @@ public class DataNode extends ReconfigurableBase
*/
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
List<StorageLocation> locations;
StorageLocationChecker storageLocationChecker =
new StorageLocationChecker(conf, new Timer());
try {
locations = storageLocationChecker.check(conf, dataDirs);
} catch (InterruptedException ie) {
throw new IOException("Failed to instantiate DataNode", ie);
}
DefaultMetricsSystem.initialize("DataNode");
assert locations.size() > 0 : "number of data directories should be > 0";
return new DataNode(conf, locations, resources);
}
// DataNode ctor expects AbstractList instead of List or Collection...
static List<StorageLocation> checkStorageLocations(
Collection<StorageLocation> dataDirs,
LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
throws IOException {
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
StringBuilder invalidDirs = new StringBuilder();
for (StorageLocation location : dataDirs) {
final URI uri = location.getUri();
try {
dataNodeDiskChecker.checkDir(localFS, new Path(uri));
locations.add(location);
} catch (IOException ioe) {
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
+ location + " : ", ioe);
invalidDirs.append("\"").append(uri.getPath()).append("\" ");
}
}
if (locations.size() == 0) {
throw new IOException("All directories in "
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+ invalidDirs);
}
return locations;
return new DataNode(conf, locations, storageLocationChecker, resources);
}
@Override

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,7 +40,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -80,12 +83,19 @@ public class StorageLocationChecker {
*/
private final int maxVolumeFailuresTolerated;
public StorageLocationChecker(Configuration conf, Timer timer) {
public StorageLocationChecker(Configuration conf, Timer timer)
throws DiskErrorException {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (maxAllowedTimeForCheckMs <= 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ maxAllowedTimeForCheckMs + " (should be > 0)");
}
expectedPermission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
@ -94,6 +104,12 @@ public class StorageLocationChecker {
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
if (maxVolumeFailuresTolerated < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + " (should be non-negative)");
}
this.timer = timer;
delegateChecker = new ThrottledAsyncChecker<>(
@ -113,6 +129,9 @@ public class StorageLocationChecker {
* Initiate a check of the supplied storage volumes and return
* a list of failed volumes.
*
* StorageLocations are returned in the same order as the input
* for compatibility with existing unit tests.
*
* @param conf HDFS configuration.
* @param dataDirs list of volumes to check.
* @return returns a list of failed volumes. Returns the empty list if
@ -128,7 +147,8 @@ public class StorageLocationChecker {
final Collection<StorageLocation> dataDirs)
throws InterruptedException, IOException {
final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
final HashMap<StorageLocation, Boolean> goodLocations =
new LinkedHashMap<>();
final Set<StorageLocation> failedLocations = new HashSet<>();
final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
Maps.newHashMap();
@ -137,10 +157,18 @@ public class StorageLocationChecker {
// Start parallel disk check operations on all StorageLocations.
for (StorageLocation location : dataDirs) {
goodLocations.put(location, true);
futures.put(location,
delegateChecker.schedule(location, context));
}
if (maxVolumeFailuresTolerated >= dataDirs.size()) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + ". Value configured is >= "
+ "to the number of configured volumes (" + dataDirs.size() + ").");
}
final long checkStartTimeMs = timer.monotonicNow();
// Retrieve the results of the disk checks.
@ -159,7 +187,6 @@ public class StorageLocationChecker {
entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
switch (result) {
case HEALTHY:
goodLocations.add(entry.getKey());
break;
case DEGRADED:
LOG.warn("StorageLocation {} appears to be degraded.", location);
@ -167,16 +194,17 @@ public class StorageLocationChecker {
case FAILED:
LOG.warn("StorageLocation {} detected as failed.", location);
failedLocations.add(location);
goodLocations.remove(location);
break;
default:
LOG.error("Unexpected health check result {} for StorageLocation {}",
result, location);
goodLocations.add(entry.getKey());
}
} catch (ExecutionException|TimeoutException e) {
LOG.warn("Exception checking StorageLocation " + location,
e.getCause());
failedLocations.add(location);
goodLocations.remove(location);
}
}
@ -193,7 +221,7 @@ public class StorageLocationChecker {
+ failedLocations);
}
return goodLocations;
return new ArrayList<>(goodLocations.keySet());
}
public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {

View File

@ -141,7 +141,7 @@ public class InternalDataNodeTestUtils {
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));
DataNode dn = new DataNode(conf, locations, null) {
DataNode dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {

View File

@ -223,7 +223,7 @@ public class TestBlockRecovery {
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null, ThreadLocalRandom.current().nextLong() | 1L));
dn = new DataNode(conf, locations, null) {
dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {

View File

@ -28,11 +28,6 @@ import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode.DataNodeDiskChecker;
public class TestDataDirs {
@ -96,26 +91,4 @@ public class TestDataDirs {
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
}
@Test(timeout = 30000)
public void testDataDirValidation() throws Throwable {
DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class);
doThrow(new IOException()).doThrow(new IOException()).doNothing()
.when(diskChecker)
.checkDir(any(LocalFileSystem.class), any(Path.class));
LocalFileSystem fs = mock(LocalFileSystem.class);
AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>();
locations.add(StorageLocation.parse("file:/p1/"));
locations.add(StorageLocation.parse("file:/p2/"));
locations.add(StorageLocation.parse("file:/p3/"));
List<StorageLocation> checkedLocations =
DataNode.checkStorageLocations(locations, fs, diskChecker);
assertEquals("number of valid data dirs", 1, checkedLocations.size());
String validDir =
new File(checkedLocations.iterator().next().getUri()).getPath();
assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir));
}
}

View File

@ -55,7 +55,7 @@ public class TestDataNodeUUID {
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
ArrayList<StorageLocation> locations = new ArrayList<>();
DataNode dn = new DataNode(conf, locations, null);
DataNode dn = new DataNode(conf, locations, null, null);
//Assert that Node iD is null
String nullString = null;

View File

@ -500,26 +500,21 @@ public class TestDataNodeVolumeFailure {
// bring up one more DataNode
assertEquals(repl, cluster.getDataNodes().size());
cluster.startDataNodes(newConf, 1, false, null, null);
try {
cluster.startDataNodes(newConf, 1, false, null, null);
assertTrue("Failed to get expected IOException", tolerated);
} catch (IOException ioe) {
assertFalse("Unexpected IOException " + ioe, tolerated);
return;
}
assertEquals(repl + 1, cluster.getDataNodes().size());
if (tolerated) {
// create new file and it should be able to replicate to 3 nodes
final Path p = new Path("/test1.txt");
DFSTestUtil.createFile(fs, p, block_size * blocks_num, (short) 3, 1L);
DFSTestUtil.waitReplication(fs, p, (short) (repl + 1));
} else {
// DataNode should stop soon if it does not tolerate disk failure
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final String bpid = cluster.getNamesystem().getBlockPoolId();
final BPOfferService bpos = cluster.getDataNodes().get(2)
.getBPOfferService(bpid);
return !bpos.isAlive();
}
}, 100, 30 * 1000);
}
// create new file and it should be able to replicate to 3 nodes
final Path p = new Path("/test1.txt");
DFSTestUtil.createFile(fs, p, block_size * blocks_num, (short) 3, 1L);
DFSTestUtil.waitReplication(fs, p, (short) (repl + 1));
}
/**

View File

@ -220,7 +220,7 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean());
dn = new DataNode(conf, locations, null) {
dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {

View File

@ -36,7 +36,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
@ -97,8 +96,8 @@ public class TestStorageLocationChecker {
}
/**
* Test handling when the number of failed locations is above the
* max volume failure threshold.
* Test handling when the number of volume failures tolerated is the
* same as the number of volumes.
*
* @throws Exception
*/
@ -122,15 +121,14 @@ public class TestStorageLocationChecker {
* @throws Exception
*/
@Test(timeout=30000)
public void testAllFailedLocations() throws Exception {
public void testBadConfiguration() throws Exception {
final List<StorageLocation> locations =
makeMockLocations(FAILED, FAILED, FAILED);
makeMockLocations(HEALTHY, HEALTHY, HEALTHY);
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 3);
thrown.expect(IOException.class);
thrown.expectMessage("All directories in " + DFS_DATANODE_DATA_DIR_KEY +
" are invalid");
thrown.expectMessage("Invalid value configured");
StorageLocationChecker checker =
new StorageLocationChecker(conf, new FakeTimer());
checker.check(conf, locations);