From 73d14311bc847a29c2b8ec30bbfbaf59cd3cb713 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Mon, 26 Aug 2013 03:18:40 +0000 Subject: [PATCH] HDFS-5000. DataNode configuration should allow specifying storage type git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1517417 13f79535-47bb-0310-9956-ffa450edef68 --- .../hdfs/server/datanode/BPServiceActor.java | 6 +- .../hadoop/hdfs/server/datanode/DataNode.java | 91 ++++++++++--------- .../hdfs/server/datanode/DataStorage.java | 15 +-- .../server/datanode/TestBlockRecovery.java | 29 ++---- .../hdfs/server/datanode/TestDataDirs.java | 77 +++++++++++++--- 5 files changed, 136 insertions(+), 82 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 75f42f959d7..e2a688af85a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -460,9 +460,9 @@ class BPServiceActor implements Runnable { } private String formatThreadName() { - Collection dataDirs = DataNode.getStorageDirs(dn.getConf()); - return "DataNode: [" + - StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " + + Collection dataDirs = + DataNode.getStorageLocations(dn.getConf()); + return "DataNode: [" + dataDirs.toString() + "] " + " heartbeating to " + nnAddr; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index acb9cf5688d..cfb323c527e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -272,7 +272,7 @@ public class DataNode extends Configured private JvmPauseMonitor pauseMonitor; private SecureResources secureResources = null; - private AbstractList dataDirs; + private AbstractList dataDirs; private Configuration conf; private final List usersWithLocalPathAccess; @@ -280,21 +280,12 @@ public class DataNode extends Configured ReadaheadPool readaheadPool; private final boolean getHdfsBlockLocationsEnabled; - /** - * Create the DataNode given a configuration and an array of dataDirs. - * 'dataDirs' is where the blocks are stored. - */ - DataNode(final Configuration conf, - final AbstractList dataDirs) throws IOException { - this(conf, dataDirs, null); - } - /** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy */ - DataNode(final Configuration conf, - final AbstractList dataDirs, + DataNode(final Configuration conf, + final AbstractList dataDirs, final SecureResources resources) throws IOException { super(conf); @@ -711,7 +702,7 @@ public class DataNode extends Configured * @throws IOException */ void startDataNode(Configuration conf, - AbstractList dataDirs, + AbstractList dataDirs, // DatanodeProtocol namenode, SecureResources resources ) throws IOException { @@ -861,7 +852,7 @@ public class DataNode extends Configured * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * - * @param nsInfo the handshake response from the NN. + * @param bpos block pool to initialize and register with the NameNode. * @throws IOException if the NN is inconsistent with the local storage. */ void initBlockPool(BPOfferService bpos) throws IOException { @@ -1688,17 +1679,39 @@ public class DataNode extends Configured printUsage(System.err); return null; } - Collection dataDirs = getStorageDirs(conf); + Collection dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_USER_NAME_KEY); - return makeInstance(dataDirs, conf, resources); + return makeInstance(dataLocations, conf, resources); } - static Collection getStorageDirs(Configuration conf) { - Collection dirNames = - conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); - return Util.stringCollectionAsURIs(dirNames); + static Collection parseStorageLocations( + Collection rawLocations) { + List locations = + new ArrayList(rawLocations.size()); + + for(String locationString : rawLocations) { + StorageLocation location; + try { + location = StorageLocation.parse(locationString); + } catch (IOException ioe) { + LOG.error("Failed to parse storage location " + locationString); + continue; + } catch (IllegalArgumentException iae) { + LOG.error(iae.toString()); + continue; + } + + locations.add(location); + } + + return locations; + } + + static Collection getStorageLocations(Configuration conf) { + return parseStorageLocations( + conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY)); } /** Instantiate & Start a single datanode daemon and wait for it to finish. @@ -1764,51 +1777,45 @@ public class DataNode extends Configured * no directory from this directory list can be created. * @throws IOException */ - static DataNode makeInstance(Collection dataDirs, Configuration conf, - SecureResources resources) throws IOException { + static DataNode makeInstance(Collection dataDirs, + Configuration conf, SecureResources resources) throws IOException { LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission( conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission); - ArrayList dirs = - getDataDirsFromURIs(dataDirs, localFS, dataNodeDiskChecker); + ArrayList locations = + checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker); DefaultMetricsSystem.initialize("DataNode"); - assert dirs.size() > 0 : "number of data directories should be > 0"; - return new DataNode(conf, dirs, resources); + assert locations.size() > 0 : "number of data directories should be > 0"; + return new DataNode(conf, locations, resources); } // DataNode ctor expects AbstractList instead of List or Collection... - static ArrayList getDataDirsFromURIs(Collection dataDirs, + static ArrayList checkStorageLocations( + Collection dataDirs, LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker) throws IOException { - ArrayList dirs = new ArrayList(); + ArrayList locations = new ArrayList(); StringBuilder invalidDirs = new StringBuilder(); - for (URI dirURI : dataDirs) { - if (!"file".equalsIgnoreCase(dirURI.getScheme())) { - LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ..."); - invalidDirs.append("\"").append(dirURI).append("\" "); - continue; - } - // drop any (illegal) authority in the URI for backwards compatibility - File dir = new File(dirURI.getPath()); + for (StorageLocation location : dataDirs) { try { - dataNodeDiskChecker.checkDir(localFS, new Path(dir.toURI())); - dirs.add(dir); + dataNodeDiskChecker.checkDir(localFS, new Path(location.getUri())); + locations.add(location); } catch (IOException ioe) { LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " " - + dir + " : ", ioe); - invalidDirs.append("\"").append(dir.getCanonicalPath()).append("\" "); + + location.getFile() + " : ", ioe); + invalidDirs.append("\"").append(location.getFile().getCanonicalPath()).append("\" "); } } - if (dirs.size() == 0) { + if (locations.size() == 0) { throw new IOException("All directories in " + DFS_DATANODE_DATA_DIR_KEY + " are invalid: " + invalidDirs); } - return dirs; + return locations; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index b04f46cedf1..7fbe13c90c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -129,7 +129,8 @@ public class DataStorage extends Storage { * @throws IOException */ synchronized void recoverTransitionRead(DataNode datanode, - NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) + NamespaceInfo nsInfo, Collection dataDirs, + StartupOption startOpt) throws IOException { if (initialized) { // DN storage has been initialized, no need to do anything @@ -145,8 +146,8 @@ public class DataStorage extends Storage { // Format and recover. this.storageDirs = new ArrayList(dataDirs.size()); ArrayList dataDirStates = new ArrayList(dataDirs.size()); - for(Iterator it = dataDirs.iterator(); it.hasNext();) { - File dataDir = it.next(); + for(Iterator it = dataDirs.iterator(); it.hasNext();) { + File dataDir = it.next().getFile(); StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { @@ -215,14 +216,14 @@ public class DataStorage extends Storage { * @throws IOException on error */ void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt) throws IOException { + Collection dataDirs, StartupOption startOpt) throws IOException { // First ensure datanode level format/snapshot/rollback is completed recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt); - + // Create list of storage directories for the block pool Collection bpDataDirs = new ArrayList(); - for(Iterator it = dataDirs.iterator(); it.hasNext();) { - File dnRoot = it.next(); + for(Iterator it = dataDirs.iterator(); it.hasNext();) { + File dnRoot = it.next().getFile(); File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot, STORAGE_DIR_CURRENT)); bpDataDirs.add(bpRoot); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index a5792ad217f..eb9f1769029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -44,18 +46,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -80,10 +73,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -121,7 +111,7 @@ public class TestBlockRecovery { * @throws IOException */ @Before - public void startUp() throws IOException { + public void startUp() throws IOException, URISyntaxException { tearDownDone = false; conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); @@ -131,11 +121,12 @@ public class TestBlockRecovery { conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); FileSystem.setDefaultUri(conf, "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); - ArrayList dirs = new ArrayList(); + ArrayList locations = new ArrayList(); File dataDir = new File(DATA_DIR); FileUtil.fullyDelete(dataDir); dataDir.mkdirs(); - dirs.add(dataDir); + StorageLocation location = new StorageLocation(new URI(dataDir.getPath())); + locations.add(location); final DatanodeProtocolClientSideTranslatorPB namenode = mock(DatanodeProtocolClientSideTranslatorPB.class); @@ -161,7 +152,7 @@ public class TestBlockRecovery { new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1))); - dn = new DataNode(conf, dirs, null) { + dn = new DataNode(conf, locations, null) { @Override DatanodeProtocolClientSideTranslatorPB connectToNN( InetSocketAddress nnAddr) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java index fc7a3ff01d1..57ee14464cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.*; import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -34,19 +37,71 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode.DataNodeDiskChecker; public class TestDataDirs { - @Test (timeout = 10000) - public void testGetDataDirsFromURIs() throws Throwable { + @Test (timeout = 30000) + public void testDataDirParsing() throws Throwable { + Configuration conf = new Configuration(); + ArrayList locations; + File dir0 = new File("/dir0"); + File dir1 = new File("/dir1"); + File dir2 = new File("/dir2"); + File dir3 = new File("/dir3"); + + // Verify that a valid string is correctly parsed, and that storage + // type is not case-sensitive + String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1); + locations = new ArrayList(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(4)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + assertThat(locations.get(2).getStorageType(), is(StorageType.SSD)); + assertThat(locations.get(2).getUri(), is(dir2.toURI())); + assertThat(locations.get(3).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(3).getUri(), is(dir3.toURI())); + + // Verify that an unrecognized storage type is ignored. + String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations2); + locations = new ArrayList(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(3)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.SSD)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + assertThat(locations.get(2).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(2).getUri(), is(dir2.toURI())); + + // Assert that a string with no storage type specified is + // correctly parsed and the default storage type is picked up. + String locations3 = "/dir0,/dir1"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, locations3); + locations = new ArrayList(DataNode.getStorageLocations(conf)); + assertThat(locations.size(), is(2)); + assertThat(locations.get(0).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(0).getUri(), is(dir0.toURI())); + assertThat(locations.get(1).getStorageType(), is(StorageType.DISK)); + assertThat(locations.get(1).getUri(), is(dir1.toURI())); + } + + @Test (timeout = 30000) + public void testDataDirValidation() throws Throwable { DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class); doThrow(new IOException()).doThrow(new IOException()).doNothing() .when(diskChecker).checkDir(any(LocalFileSystem.class), any(Path.class)); LocalFileSystem fs = mock(LocalFileSystem.class); - Collection uris = Arrays.asList(new URI("file:/p1/"), - new URI("file:/p2/"), new URI("file:/p3/")); + AbstractList locations = new ArrayList(); - List dirs = DataNode.getDataDirsFromURIs(uris, fs, diskChecker); - assertEquals("number of valid data dirs", 1, dirs.size()); - String validDir = dirs.iterator().next().getPath(); - assertEquals("p3 should be valid", new File("/p3").getPath(), validDir); + locations.add(new StorageLocation(new URI("file:/p1/"))); + locations.add(new StorageLocation(new URI("file:/p2/"))); + locations.add(new StorageLocation(new URI("file:/p3/"))); + + ArrayList checkedLocations = + DataNode.checkStorageLocations(locations, fs, diskChecker); + assertEquals("number of valid data dirs", 1, checkedLocations.size()); + String validDir = checkedLocations.iterator().next().getFile().getPath(); + assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir)); } }