diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4047a51905e..97cfbd1b6de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -370,6 +370,9 @@ Release 2.6.0 - UNRELEASED HDFS-6781. Separate HDFS commands from CommandsManual.apt.vm. (Akira Ajisaka via Arpit Agarwal) + HDFS-6728. Dynamically add new volumes to DataStorage, formatted if + necessary. (Lei Xu via atm) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index d065b5736e6..8e65dd0b548 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -36,8 +36,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -106,13 +108,22 @@ public class BlockPoolSliceStorage extends Storage { void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); + Set existingStorageDirs = new HashSet(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + // 1. For each BP data directory analyze the state and // check whether all is consistent before transitioning. - this.storageDirs = new ArrayList(dataDirs.size()); ArrayList dataDirStates = new ArrayList( dataDirs.size()); for (Iterator it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageState curState; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 5a55d094e11..4b9656eb8e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -55,6 +55,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -172,43 +173,99 @@ public class DataStorage extends Storage { } /** - * Analyze storage directories. - * Recover from previous transitions if required. - * Perform fs state transition if necessary depending on the namespace info. - * Read storage info. - *
- * This method should be synchronized between multiple DN threads. Only the - * first DN thread does DN level storage dir recoverTransitionRead. - * + * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} + */ + private void writeAll(Collection dirs) throws IOException { + this.layoutVersion = getServiceLayoutVersion(); + for (StorageDirectory dir : dirs) { + writeProperties(dir); + } + } + + /** + * Add a list of volumes to be managed by DataStorage. If the volume is empty, + * format it, otherwise recover it from previous transitions if required. + * + * @param datanode the reference to DataNode. * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option * @throws IOException */ - synchronized void recoverTransitionRead(DataNode datanode, + synchronized void addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { - if (initialized) { - // DN storage has been initialized, no need to do anything - return; + // Similar to recoverTransitionRead, it first ensures the datanode level + // format is completed. + List tmpDataDirs = + new ArrayList(dataDirs); + addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); + + Collection bpDataDirs = new ArrayList(); + String bpid = nsInfo.getBlockPoolID(); + for (StorageLocation dir : dataDirs) { + File dnRoot = dir.getFile(); + File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, + STORAGE_DIR_CURRENT)); + bpDataDirs.add(bpRoot); } - LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION - + " and name-node layout version: " + nsInfo.getLayoutVersion()); - - // 1. For each data directory calculate its state and - // check whether all is consistent before transitioning. - // Format and recover. - this.storageDirs = new ArrayList(dataDirs.size()); - ArrayList dataDirStates = new ArrayList(dataDirs.size()); + // mkdir for the list of BlockPoolStorage + makeBlockPoolDataDir(bpDataDirs, null); + BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); + } + + bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); + addBlockPoolStorage(bpid, bpStorage); + } + + /** + * Add a list of volumes to be managed by this DataStorage. If the volume is + * empty, it formats the volume, otherwise it recovers it from previous + * transitions if required. + * + * If isInitialize is false, only the directories that have finished the + * doTransition() process will be added into DataStorage. + * + * @param datanode the reference to DataNode. + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @param isInitialize whether it is called when DataNode starts up. + * @throws IOException + */ + private synchronized void addStorageLocations(DataNode datanode, + NamespaceInfo nsInfo, Collection dataDirs, + StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) + throws IOException { + Set existingStorageDirs = new HashSet(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + + // 1. For each data directory calculate its state and check whether all is + // consistent before transitioning. Format and recover. + ArrayList dataDirStates = + new ArrayList(dataDirs.size()); + List addedStorageDirectories = + new ArrayList(); for(Iterator it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next().getFile(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { curState = sd.analyzeStorage(startOpt, this); // sd is locked but not opened - switch(curState) { + switch (curState) { case NORMAL: break; case NON_EXISTENT: @@ -217,7 +274,8 @@ public class DataStorage extends Storage { it.remove(); continue; case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted"); + LOG.info("Storage directory " + dataDir + " is not formatted for " + + nsInfo.getBlockPoolID()); LOG.info("Formatting ..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; @@ -231,33 +289,82 @@ public class DataStorage extends Storage { //continue with other good dirs continue; } - // add to the storage list - addStorageDir(sd); + if (isInitialize) { + addStorageDir(sd); + } + addedStorageDirectories.add(sd); dataDirStates.add(curState); } - if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist + if (dataDirs.size() == 0 || dataDirStates.size() == 0) { + // none of the data dirs exist + if (ignoreExistingDirs) { + return; + } throw new IOException( "All specified directories are not accessible or do not exist."); + } // 2. Do transitions // Each storage directory is treated individually. - // During startup some of them can upgrade or rollback - // while others could be uptodate for the regular startup. - try { - for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); - createStorageID(getStorageDir(idx)); + // During startup some of them can upgrade or rollback + // while others could be up-to-date for the regular startup. + for (Iterator it = addedStorageDirectories.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + try { + doTransition(datanode, sd, nsInfo, startOpt); + createStorageID(sd); + } catch (IOException e) { + if (!isInitialize) { + sd.unlock(); + it.remove(); + continue; + } + unlockAll(); + throw e; } - } catch (IOException e) { - unlockAll(); - throw e; } - // 3. Update all storages. Some of them might have just been formatted. - this.writeAll(); + // 3. Update all successfully loaded storages. Some of them might have just + // been formatted. + this.writeAll(addedStorageDirectories); + + // 4. Make newly loaded storage directories visible for service. + if (!isInitialize) { + this.storageDirs.addAll(addedStorageDirectories); + } + } + + /** + * Analyze storage directories. + * Recover from previous transitions if required. + * Perform fs state transition if necessary depending on the namespace info. + * Read storage info. + *
+ * This method should be synchronized between multiple DN threads. Only the + * first DN thread does DN level storage dir recoverTransitionRead. + * + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @throws IOException + */ + synchronized void recoverTransitionRead(DataNode datanode, + NamespaceInfo nsInfo, Collection dataDirs, + StartupOption startOpt) + throws IOException { + if (initialized) { + // DN storage has been initialized, no need to do anything + return; + } + LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION + + " and NameNode layout version: " + nsInfo.getLayoutVersion()); + + this.storageDirs = new ArrayList(dataDirs.size()); + addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); - // 4. mark DN storage is initialized + // mark DN storage is initialized this.initialized = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java new file mode 100644 index 00000000000..1d700e3cdfc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestDataStorage { + private final static String DEFAULT_BPID = "bp-0"; + private final static String CLUSTER_ID = "cluster0"; + private final static String BUILD_VERSION = "2.0"; + private final static String SOFTWARE_VERSION = "2.0"; + private final static long CTIME = 1; + private final static File TEST_DIR = + new File(System.getProperty("test.build.data") + "/dstest"); + private final static StartupOption START_OPT = StartupOption.REGULAR; + + private DataNode mockDN = Mockito.mock(DataNode.class); + private NamespaceInfo nsInfo; + private DataStorage storage; + + @Before + public void setUp() throws IOException { + storage = new DataStorage(); + nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME, + BUILD_VERSION, SOFTWARE_VERSION); + FileUtil.fullyDelete(TEST_DIR); + assertTrue("Failed to make test dir.", TEST_DIR.mkdirs()); + } + + @After + public void tearDown() throws IOException { + FileUtil.fullyDelete(TEST_DIR); + } + + private static List createStorageLocations(int numLocs) + throws IOException { + return createStorageLocations(numLocs, false); + } + + /** + * Create a list of StorageLocations. + * If asFile sets to true, create StorageLocation as regular files, otherwise + * create directories for each location. + * @param numLocs the total number of StorageLocations to be created. + * @param asFile set to true to create as file. + * @return a list of StorageLocations. + */ + private static List createStorageLocations( + int numLocs, boolean asFile) throws IOException { + List locations = new ArrayList(); + for (int i = 0; i < numLocs; i++) { + String uri = TEST_DIR + "/data" + i; + File file = new File(uri); + if (asFile) { + file.getParentFile().mkdirs(); + file.createNewFile(); + } else { + file.mkdirs(); + } + StorageLocation loc = StorageLocation.parse(uri); + locations.add(loc); + } + return locations; + } + + private static List createNamespaceInfos(int num) { + List nsInfos = new ArrayList(); + for (int i = 0; i < num; i++) { + String bpid = "bp-" + i; + nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, CTIME, BUILD_VERSION, + SOFTWARE_VERSION)); + } + return nsInfos; + } + + /** Check whether the path is a valid DataNode data directory. */ + private static void checkDir(File dataDir) { + Storage.StorageDirectory sd = new Storage.StorageDirectory(dataDir); + assertTrue(sd.getRoot().isDirectory()); + assertTrue(sd.getCurrentDir().isDirectory()); + assertTrue(sd.getVersionFile().isFile()); + } + + /** Check whether the root is a valid BlockPoolSlice storage. */ + private static void checkDir(File root, String bpid) { + Storage.StorageDirectory sd = new Storage.StorageDirectory(root); + File bpRoot = new File(sd.getCurrentDir(), bpid); + Storage.StorageDirectory bpSd = new Storage.StorageDirectory(bpRoot); + assertTrue(bpSd.getRoot().isDirectory()); + assertTrue(bpSd.getCurrentDir().isDirectory()); + assertTrue(bpSd.getVersionFile().isFile()); + } + + @Test + public void testAddStorageDirectories() throws IOException, + URISyntaxException { + final int numLocations = 3; + final int numNamespace = 3; + List locations = createStorageLocations(numLocations); + + // Add volumes for multiple namespaces. + List namespaceInfos = createNamespaceInfos(numNamespace); + for (NamespaceInfo ni : namespaceInfos) { + storage.addStorageLocations(mockDN, ni, locations, START_OPT); + for (StorageLocation sl : locations) { + checkDir(sl.getFile()); + checkDir(sl.getFile(), ni.getBlockPoolID()); + } + } + + assertEquals(numLocations, storage.getNumStorageDirs()); + + locations = createStorageLocations(numLocations); + try { + storage.addStorageLocations(mockDN, namespaceInfos.get(0), + locations, START_OPT); + fail("Expected to throw IOException: adding active directories."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "All specified directories are not accessible or do not exist.", e); + } + // The number of active storage dirs has not changed, since it tries to + // add the storage dirs that are under service. + assertEquals(numLocations, storage.getNumStorageDirs()); + + // Add more directories. + locations = createStorageLocations(6); + storage.addStorageLocations(mockDN, nsInfo, locations, START_OPT); + assertEquals(6, storage.getNumStorageDirs()); + } + + @Test + public void testRecoverTransitionReadFailure() throws IOException { + final int numLocations = 3; + List locations = + createStorageLocations(numLocations, true); + + try { + storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT); + fail("An IOException should throw: all StorageLocations are NON_EXISTENT"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "All specified directories are not accessible or do not exist.", e); + } + assertEquals(0, storage.getNumStorageDirs()); + } + + /** + * This test enforces the behavior that if there is an exception from + * doTransition() during DN starts up, the storage directories that have + * already been processed are still visible, i.e., in + * DataStorage.storageDirs(). + */ + @Test + public void testRecoverTransitionReadDoTransitionFailure() + throws IOException { + final int numLocations = 3; + List locations = createStorageLocations(numLocations); + String bpid = nsInfo.getBlockPoolID(); + // Prepare volumes + storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT); + + // Reset DataStorage + storage.unlockAll(); + storage = new DataStorage(); + // Trigger an exception from doTransition(). + nsInfo.clusterID = "cluster1"; + try { + storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT); + fail("Expect to throw an exception from doTransition()"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e); + } + assertEquals(numLocations, storage.getNumStorageDirs()); + } +}