HDFS-6728. Dynamically add new volumes to DataStorage, formatted if necessary. Contributed by Lei Xu.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a3f21cb08
commit
84d57bbfa8
|
@ -112,6 +112,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6781. Separate HDFS commands from CommandsManual.apt.vm. (Akira
|
||||
Ajisaka via Arpit Agarwal)
|
||||
|
||||
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
|
||||
necessary. (Lei Xu vi atm)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
|
|
@ -36,8 +36,10 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -106,13 +108,22 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
|
||||
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
|
||||
Set<String> existingStorageDirs = new HashSet<String>();
|
||||
for (int i = 0; i < getNumStorageDirs(); i++) {
|
||||
existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
|
||||
}
|
||||
|
||||
// 1. For each BP data directory analyze the state and
|
||||
// check whether all is consistent before transitioning.
|
||||
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
||||
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
|
||||
dataDirs.size());
|
||||
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dataDir = it.next();
|
||||
if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
|
||||
LOG.info("Storage directory " + dataDir + " has already been used.");
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
|
||||
StorageState curState;
|
||||
try {
|
||||
|
|
|
@ -149,43 +149,99 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
|
||||
/**
|
||||
* Analyze storage directories.
|
||||
* Recover from previous transitions if required.
|
||||
* Perform fs state transition if necessary depending on the namespace info.
|
||||
* Read storage info.
|
||||
* <br>
|
||||
* This method should be synchronized between multiple DN threads. Only the
|
||||
* first DN thread does DN level storage dir recoverTransitionRead.
|
||||
*
|
||||
* {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
|
||||
*/
|
||||
private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
|
||||
this.layoutVersion = getServiceLayoutVersion();
|
||||
for (StorageDirectory dir : dirs) {
|
||||
writeProperties(dir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a list of volumes to be managed by DataStorage. If the volume is empty,
|
||||
* format it, otherwise recover it from previous transitions if required.
|
||||
*
|
||||
* @param datanode the reference to DataNode.
|
||||
* @param nsInfo namespace information
|
||||
* @param dataDirs array of data storage directories
|
||||
* @param startOpt startup option
|
||||
* @throws IOException
|
||||
*/
|
||||
synchronized void recoverTransitionRead(DataNode datanode,
|
||||
synchronized void addStorageLocations(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt)
|
||||
throws IOException {
|
||||
if (initialized) {
|
||||
// DN storage has been initialized, no need to do anything
|
||||
return;
|
||||
// Similar to recoverTransitionRead, it first ensures the datanode level
|
||||
// format is completed.
|
||||
List<StorageLocation> tmpDataDirs =
|
||||
new ArrayList<StorageLocation>(dataDirs);
|
||||
addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
|
||||
|
||||
Collection<File> bpDataDirs = new ArrayList<File>();
|
||||
String bpid = nsInfo.getBlockPoolID();
|
||||
for (StorageLocation dir : dataDirs) {
|
||||
File dnRoot = dir.getFile();
|
||||
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
|
||||
STORAGE_DIR_CURRENT));
|
||||
bpDataDirs.add(bpRoot);
|
||||
}
|
||||
LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
|
||||
+ " and name-node layout version: " + nsInfo.getLayoutVersion());
|
||||
|
||||
// 1. For each data directory calculate its state and
|
||||
// check whether all is consistent before transitioning.
|
||||
// Format and recover.
|
||||
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
||||
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
|
||||
// mkdir for the list of BlockPoolStorage
|
||||
makeBlockPoolDataDir(bpDataDirs, null);
|
||||
BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
|
||||
if (bpStorage == null) {
|
||||
bpStorage = new BlockPoolSliceStorage(
|
||||
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
|
||||
nsInfo.getClusterID());
|
||||
}
|
||||
|
||||
bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
|
||||
addBlockPoolStorage(bpid, bpStorage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a list of volumes to be managed by this DataStorage. If the volume is
|
||||
* empty, it formats the volume, otherwise it recovers it from previous
|
||||
* transitions if required.
|
||||
*
|
||||
* If isInitialize is false, only the directories that have finished the
|
||||
* doTransition() process will be added into DataStorage.
|
||||
*
|
||||
* @param datanode the reference to DataNode.
|
||||
* @param nsInfo namespace information
|
||||
* @param dataDirs array of data storage directories
|
||||
* @param startOpt startup option
|
||||
* @param isInitialize whether it is called when DataNode starts up.
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void addStorageLocations(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
|
||||
throws IOException {
|
||||
Set<String> existingStorageDirs = new HashSet<String>();
|
||||
for (int i = 0; i < getNumStorageDirs(); i++) {
|
||||
existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
|
||||
}
|
||||
|
||||
// 1. For each data directory calculate its state and check whether all is
|
||||
// consistent before transitioning. Format and recover.
|
||||
ArrayList<StorageState> dataDirStates =
|
||||
new ArrayList<StorageState>(dataDirs.size());
|
||||
List<StorageDirectory> addedStorageDirectories =
|
||||
new ArrayList<StorageDirectory>();
|
||||
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dataDir = it.next().getFile();
|
||||
if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
|
||||
LOG.info("Storage directory " + dataDir + " has already been used.");
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
StorageDirectory sd = new StorageDirectory(dataDir);
|
||||
StorageState curState;
|
||||
try {
|
||||
curState = sd.analyzeStorage(startOpt, this);
|
||||
// sd is locked but not opened
|
||||
switch(curState) {
|
||||
switch (curState) {
|
||||
case NORMAL:
|
||||
break;
|
||||
case NON_EXISTENT:
|
||||
|
@ -194,7 +250,8 @@ public class DataStorage extends Storage {
|
|||
it.remove();
|
||||
continue;
|
||||
case NOT_FORMATTED: // format
|
||||
LOG.info("Storage directory " + dataDir + " is not formatted");
|
||||
LOG.info("Storage directory " + dataDir + " is not formatted for "
|
||||
+ nsInfo.getBlockPoolID());
|
||||
LOG.info("Formatting ...");
|
||||
format(sd, nsInfo, datanode.getDatanodeUuid());
|
||||
break;
|
||||
|
@ -208,33 +265,82 @@ public class DataStorage extends Storage {
|
|||
//continue with other good dirs
|
||||
continue;
|
||||
}
|
||||
// add to the storage list
|
||||
addStorageDir(sd);
|
||||
if (isInitialize) {
|
||||
addStorageDir(sd);
|
||||
}
|
||||
addedStorageDirectories.add(sd);
|
||||
dataDirStates.add(curState);
|
||||
}
|
||||
|
||||
if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist
|
||||
if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
|
||||
// none of the data dirs exist
|
||||
if (ignoreExistingDirs) {
|
||||
return;
|
||||
}
|
||||
throw new IOException(
|
||||
"All specified directories are not accessible or do not exist.");
|
||||
}
|
||||
|
||||
// 2. Do transitions
|
||||
// Each storage directory is treated individually.
|
||||
// During startup some of them can upgrade or rollback
|
||||
// while others could be uptodate for the regular startup.
|
||||
try {
|
||||
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
||||
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
||||
createStorageID(getStorageDir(idx));
|
||||
// During startup some of them can upgrade or rollback
|
||||
// while others could be up-to-date for the regular startup.
|
||||
for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
|
||||
it.hasNext(); ) {
|
||||
StorageDirectory sd = it.next();
|
||||
try {
|
||||
doTransition(datanode, sd, nsInfo, startOpt);
|
||||
createStorageID(sd);
|
||||
} catch (IOException e) {
|
||||
if (!isInitialize) {
|
||||
sd.unlock();
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
unlockAll();
|
||||
throw e;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
unlockAll();
|
||||
throw e;
|
||||
}
|
||||
|
||||
// 3. Update all storages. Some of them might have just been formatted.
|
||||
this.writeAll();
|
||||
// 3. Update all successfully loaded storages. Some of them might have just
|
||||
// been formatted.
|
||||
this.writeAll(addedStorageDirectories);
|
||||
|
||||
// 4. Make newly loaded storage directories visible for service.
|
||||
if (!isInitialize) {
|
||||
this.storageDirs.addAll(addedStorageDirectories);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze storage directories.
|
||||
* Recover from previous transitions if required.
|
||||
* Perform fs state transition if necessary depending on the namespace info.
|
||||
* Read storage info.
|
||||
* <br>
|
||||
* This method should be synchronized between multiple DN threads. Only the
|
||||
* first DN thread does DN level storage dir recoverTransitionRead.
|
||||
*
|
||||
* @param nsInfo namespace information
|
||||
* @param dataDirs array of data storage directories
|
||||
* @param startOpt startup option
|
||||
* @throws IOException
|
||||
*/
|
||||
synchronized void recoverTransitionRead(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt)
|
||||
throws IOException {
|
||||
if (initialized) {
|
||||
// DN storage has been initialized, no need to do anything
|
||||
return;
|
||||
}
|
||||
LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
|
||||
+ " and NameNode layout version: " + nsInfo.getLayoutVersion());
|
||||
|
||||
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
||||
addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
|
||||
|
||||
// 4. mark DN storage is initialized
|
||||
// mark DN storage is initialized
|
||||
this.initialized = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestDataStorage {
|
||||
private final static String DEFAULT_BPID = "bp-0";
|
||||
private final static String CLUSTER_ID = "cluster0";
|
||||
private final static String BUILD_VERSION = "2.0";
|
||||
private final static String SOFTWARE_VERSION = "2.0";
|
||||
private final static long CTIME = 1;
|
||||
private final static File TEST_DIR =
|
||||
new File(System.getProperty("test.build.data") + "/dstest");
|
||||
private final static StartupOption START_OPT = StartupOption.REGULAR;
|
||||
|
||||
private DataNode mockDN = Mockito.mock(DataNode.class);
|
||||
private NamespaceInfo nsInfo;
|
||||
private DataStorage storage;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
storage = new DataStorage();
|
||||
nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
|
||||
BUILD_VERSION, SOFTWARE_VERSION);
|
||||
FileUtil.fullyDelete(TEST_DIR);
|
||||
assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
FileUtil.fullyDelete(TEST_DIR);
|
||||
}
|
||||
|
||||
private static List<StorageLocation> createStorageLocations(int numLocs)
|
||||
throws IOException {
|
||||
return createStorageLocations(numLocs, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a list of StorageLocations.
|
||||
* If asFile sets to true, create StorageLocation as regular files, otherwise
|
||||
* create directories for each location.
|
||||
* @param numLocs the total number of StorageLocations to be created.
|
||||
* @param asFile set to true to create as file.
|
||||
* @return a list of StorageLocations.
|
||||
*/
|
||||
private static List<StorageLocation> createStorageLocations(
|
||||
int numLocs, boolean asFile) throws IOException {
|
||||
List<StorageLocation> locations = new ArrayList<StorageLocation>();
|
||||
for (int i = 0; i < numLocs; i++) {
|
||||
String uri = TEST_DIR + "/data" + i;
|
||||
File file = new File(uri);
|
||||
if (asFile) {
|
||||
file.getParentFile().mkdirs();
|
||||
file.createNewFile();
|
||||
} else {
|
||||
file.mkdirs();
|
||||
}
|
||||
StorageLocation loc = StorageLocation.parse(uri);
|
||||
locations.add(loc);
|
||||
}
|
||||
return locations;
|
||||
}
|
||||
|
||||
private static List<NamespaceInfo> createNamespaceInfos(int num) {
|
||||
List<NamespaceInfo> nsInfos = new ArrayList<NamespaceInfo>();
|
||||
for (int i = 0; i < num; i++) {
|
||||
String bpid = "bp-" + i;
|
||||
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, CTIME, BUILD_VERSION,
|
||||
SOFTWARE_VERSION));
|
||||
}
|
||||
return nsInfos;
|
||||
}
|
||||
|
||||
/** Check whether the path is a valid DataNode data directory. */
|
||||
private static void checkDir(File dataDir) {
|
||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(dataDir);
|
||||
assertTrue(sd.getRoot().isDirectory());
|
||||
assertTrue(sd.getCurrentDir().isDirectory());
|
||||
assertTrue(sd.getVersionFile().isFile());
|
||||
}
|
||||
|
||||
/** Check whether the root is a valid BlockPoolSlice storage. */
|
||||
private static void checkDir(File root, String bpid) {
|
||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
|
||||
File bpRoot = new File(sd.getCurrentDir(), bpid);
|
||||
Storage.StorageDirectory bpSd = new Storage.StorageDirectory(bpRoot);
|
||||
assertTrue(bpSd.getRoot().isDirectory());
|
||||
assertTrue(bpSd.getCurrentDir().isDirectory());
|
||||
assertTrue(bpSd.getVersionFile().isFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddStorageDirectories() throws IOException,
|
||||
URISyntaxException {
|
||||
final int numLocations = 3;
|
||||
final int numNamespace = 3;
|
||||
List<StorageLocation> locations = createStorageLocations(numLocations);
|
||||
|
||||
// Add volumes for multiple namespaces.
|
||||
List<NamespaceInfo> namespaceInfos = createNamespaceInfos(numNamespace);
|
||||
for (NamespaceInfo ni : namespaceInfos) {
|
||||
storage.addStorageLocations(mockDN, ni, locations, START_OPT);
|
||||
for (StorageLocation sl : locations) {
|
||||
checkDir(sl.getFile());
|
||||
checkDir(sl.getFile(), ni.getBlockPoolID());
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(numLocations, storage.getNumStorageDirs());
|
||||
|
||||
locations = createStorageLocations(numLocations);
|
||||
try {
|
||||
storage.addStorageLocations(mockDN, namespaceInfos.get(0),
|
||||
locations, START_OPT);
|
||||
fail("Expected to throw IOException: adding active directories.");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"All specified directories are not accessible or do not exist.", e);
|
||||
}
|
||||
// The number of active storage dirs has not changed, since it tries to
|
||||
// add the storage dirs that are under service.
|
||||
assertEquals(numLocations, storage.getNumStorageDirs());
|
||||
|
||||
// Add more directories.
|
||||
locations = createStorageLocations(6);
|
||||
storage.addStorageLocations(mockDN, nsInfo, locations, START_OPT);
|
||||
assertEquals(6, storage.getNumStorageDirs());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoverTransitionReadFailure() throws IOException {
|
||||
final int numLocations = 3;
|
||||
List<StorageLocation> locations =
|
||||
createStorageLocations(numLocations, true);
|
||||
|
||||
try {
|
||||
storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
|
||||
fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"All specified directories are not accessible or do not exist.", e);
|
||||
}
|
||||
assertEquals(0, storage.getNumStorageDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test enforces the behavior that if there is an exception from
|
||||
* doTransition() during DN starts up, the storage directories that have
|
||||
* already been processed are still visible, i.e., in
|
||||
* DataStorage.storageDirs().
|
||||
*/
|
||||
@Test
|
||||
public void testRecoverTransitionReadDoTransitionFailure()
|
||||
throws IOException {
|
||||
final int numLocations = 3;
|
||||
List<StorageLocation> locations = createStorageLocations(numLocations);
|
||||
String bpid = nsInfo.getBlockPoolID();
|
||||
// Prepare volumes
|
||||
storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
|
||||
|
||||
// Reset DataStorage
|
||||
storage.unlockAll();
|
||||
storage = new DataStorage();
|
||||
// Trigger an exception from doTransition().
|
||||
nsInfo.clusterID = "cluster1";
|
||||
try {
|
||||
storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
|
||||
fail("Expect to throw an exception from doTransition()");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
|
||||
}
|
||||
assertEquals(numLocations, storage.getNumStorageDirs());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue