HDFS-9654. Code refactoring for HDFS-8578.
This commit is contained in:
parent
b10d8ced21
commit
d37ecbf721
|
@ -1776,6 +1776,8 @@ Release 2.7.3 - UNRELEASED
|
||||||
HDFS-9634. webhdfs client side exceptions don't provide enough details
|
HDFS-9634. webhdfs client side exceptions don't provide enough details
|
||||||
(Eric Payne via kihwal)
|
(Eric Payne via kihwal)
|
||||||
|
|
||||||
|
HDFS-9654. Code refactoring for HDFS-8578. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -640,7 +640,8 @@ public abstract class Storage extends StorageInfo {
|
||||||
rename(getLastCheckpointTmp(), curDir);
|
rename(getLastCheckpointTmp(), curDir);
|
||||||
return;
|
return;
|
||||||
default:
|
default:
|
||||||
throw new IOException("Unexpected FS state: " + curState);
|
throw new IOException("Unexpected FS state: " + curState
|
||||||
|
+ " for storage directory: " + rootPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,10 +18,21 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.File;
|
||||||
import com.google.common.base.Preconditions;
|
import java.io.IOException;
|
||||||
import com.google.common.collect.Lists;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.HardLink;
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
|
@ -34,18 +45,9 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
|
||||||
import java.io.File;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.IOException;
|
import com.google.common.base.Preconditions;
|
||||||
import java.util.ArrayList;
|
import com.google.common.collect.Lists;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages storage for the set of BlockPoolSlices which share a particular
|
* Manages storage for the set of BlockPoolSlices which share a particular
|
||||||
|
@ -136,15 +138,15 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
/**
|
/**
|
||||||
* Load one storage directory. Recover from previous transitions if required.
|
* Load one storage directory. Recover from previous transitions if required.
|
||||||
*
|
*
|
||||||
* @param datanode datanode instance
|
|
||||||
* @param nsInfo namespace information
|
* @param nsInfo namespace information
|
||||||
* @param dataDir the root path of the storage directory
|
* @param dataDir the root path of the storage directory
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @return the StorageDirectory successfully loaded.
|
* @return the StorageDirectory successfully loaded.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private StorageDirectory loadStorageDirectory(DataNode datanode,
|
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
|
||||||
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException {
|
File dataDir, StartupOption startOpt, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
|
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
|
||||||
try {
|
try {
|
||||||
StorageState curState = sd.analyzeStorage(startOpt, this);
|
StorageState curState = sd.analyzeStorage(startOpt, this);
|
||||||
|
@ -158,8 +160,8 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
+ " does not exist");
|
+ " does not exist");
|
||||||
case NOT_FORMATTED: // format
|
case NOT_FORMATTED: // format
|
||||||
LOG.info("Block pool storage directory " + dataDir
|
LOG.info("Block pool storage directory " + dataDir
|
||||||
+ " is not formatted for " + nsInfo.getBlockPoolID());
|
+ " is not formatted for " + nsInfo.getBlockPoolID()
|
||||||
LOG.info("Formatting ...");
|
+ ". Formatting ...");
|
||||||
format(sd, nsInfo);
|
format(sd, nsInfo);
|
||||||
break;
|
break;
|
||||||
default: // recovery part is common
|
default: // recovery part is common
|
||||||
|
@ -170,10 +172,13 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
// Each storage directory is treated individually.
|
// Each storage directory is treated individually.
|
||||||
// During startup some of them can upgrade or roll back
|
// During startup some of them can upgrade or roll back
|
||||||
// while others could be up-to-date for the regular startup.
|
// while others could be up-to-date for the regular startup.
|
||||||
doTransition(datanode, sd, nsInfo, startOpt);
|
if (doTransition(sd, nsInfo, startOpt, conf)) {
|
||||||
|
return sd;
|
||||||
|
}
|
||||||
|
|
||||||
if (getCTime() != nsInfo.getCTime()) {
|
if (getCTime() != nsInfo.getCTime()) {
|
||||||
throw new IOException(
|
throw new IOException("Datanode CTime (=" + getCTime()
|
||||||
"Data-node and name-node CTimes must be the same.");
|
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Update successfully loaded storage.
|
// 3. Update successfully loaded storage.
|
||||||
|
@ -195,16 +200,15 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* Therefore, a failure on loading any block pool storage results a faulty
|
* Therefore, a failure on loading any block pool storage results a faulty
|
||||||
* data volume.
|
* data volume.
|
||||||
*
|
*
|
||||||
* @param datanode Datanode to which this storage belongs to
|
|
||||||
* @param nsInfo namespace information
|
* @param nsInfo namespace information
|
||||||
* @param dataDirs storage directories of block pool
|
* @param dataDirs storage directories of block pool
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @return an array of loaded block pool directories.
|
* @return an array of loaded block pool directories.
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
List<StorageDirectory> loadBpStorageDirectories(
|
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
|
||||||
DataNode datanode, NamespaceInfo nsInfo,
|
Collection<File> dataDirs, StartupOption startOpt,
|
||||||
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
List<StorageDirectory> succeedDirs = Lists.newArrayList();
|
List<StorageDirectory> succeedDirs = Lists.newArrayList();
|
||||||
try {
|
try {
|
||||||
for (File dataDir : dataDirs) {
|
for (File dataDir : dataDirs) {
|
||||||
|
@ -213,8 +217,8 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
"BlockPoolSliceStorage.recoverTransitionRead: " +
|
"BlockPoolSliceStorage.recoverTransitionRead: " +
|
||||||
"attempt to load an used block storage: " + dataDir);
|
"attempt to load an used block storage: " + dataDir);
|
||||||
}
|
}
|
||||||
StorageDirectory sd =
|
final StorageDirectory sd = loadStorageDirectory(
|
||||||
loadStorageDirectory(datanode, nsInfo, dataDir, startOpt);
|
nsInfo, dataDir, startOpt, conf);
|
||||||
succeedDirs.add(sd);
|
succeedDirs.add(sd);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -232,19 +236,21 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* Therefore, a failure on loading any block pool storage results a faulty
|
* Therefore, a failure on loading any block pool storage results a faulty
|
||||||
* data volume.
|
* data volume.
|
||||||
*
|
*
|
||||||
* @param datanode Datanode to which this storage belongs to
|
|
||||||
* @param nsInfo namespace information
|
* @param nsInfo namespace information
|
||||||
* @param dataDirs storage directories of block pool
|
* @param dataDirs storage directories of block pool
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
|
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
|
||||||
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
|
Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
|
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
|
||||||
for (StorageDirectory sd : loadBpStorageDirectories(
|
final List<StorageDirectory> loaded = loadBpStorageDirectories(
|
||||||
datanode, nsInfo, dataDirs, startOpt)) {
|
nsInfo, dataDirs, startOpt, conf);
|
||||||
|
for (StorageDirectory sd : loaded) {
|
||||||
addStorageDir(sd);
|
addStorageDir(sd);
|
||||||
}
|
}
|
||||||
|
return loaded;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -344,10 +350,10 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param sd storage directory <SD>/current/<bpid>
|
* @param sd storage directory <SD>/current/<bpid>
|
||||||
* @param nsInfo namespace info
|
* @param nsInfo namespace info
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @throws IOException
|
* @return true if the new properties has been written.
|
||||||
*/
|
*/
|
||||||
private void doTransition(DataNode datanode, StorageDirectory sd,
|
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||||
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
StartupOption startOpt, Configuration conf) throws IOException {
|
||||||
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
|
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
|
||||||
Preconditions.checkState(!getTrashRootDir(sd).exists(),
|
Preconditions.checkState(!getTrashRootDir(sd).exists(),
|
||||||
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
|
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
|
||||||
|
@ -379,7 +385,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
}
|
}
|
||||||
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
||||||
&& this.cTime == nsInfo.getCTime()) {
|
&& this.cTime == nsInfo.getCTime()) {
|
||||||
return; // regular startup
|
return false; // regular startup
|
||||||
}
|
}
|
||||||
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
||||||
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
|
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
|
||||||
|
@ -389,8 +395,8 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
}
|
}
|
||||||
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
||||||
|| this.cTime < nsInfo.getCTime()) {
|
|| this.cTime < nsInfo.getCTime()) {
|
||||||
doUpgrade(datanode, sd, nsInfo); // upgrade
|
doUpgrade(sd, nsInfo, conf); // upgrade
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
|
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
|
||||||
// must shutdown
|
// must shutdown
|
||||||
|
@ -418,16 +424,18 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param nsInfo Namespace Info from the namenode
|
* @param nsInfo Namespace Info from the namenode
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
|
private void doUpgrade(final StorageDirectory bpSd,
|
||||||
throws IOException {
|
final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
|
||||||
// Upgrading is applicable only to release with federation or after
|
// Upgrading is applicable only to release with federation or after
|
||||||
if (!DataNodeLayoutVersion.supports(
|
if (!DataNodeLayoutVersion.supports(
|
||||||
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
final int oldLV = getLayoutVersion();
|
||||||
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
|
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
|
||||||
+ ".\n old LV = " + this.getLayoutVersion() + "; old CTime = "
|
+ ".\n old LV = " + oldLV
|
||||||
+ this.getCTime() + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
+ "; old CTime = " + this.getCTime()
|
||||||
|
+ ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
||||||
+ "; new CTime = " + nsInfo.getCTime());
|
+ "; new CTime = " + nsInfo.getCTime());
|
||||||
// get <SD>/previous directory
|
// get <SD>/previous directory
|
||||||
String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
|
String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
|
||||||
|
@ -438,8 +446,8 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
if (dnPrevDir.exists()) {
|
if (dnPrevDir.exists()) {
|
||||||
deleteDir(dnPrevDir);
|
deleteDir(dnPrevDir);
|
||||||
}
|
}
|
||||||
File bpCurDir = bpSd.getCurrentDir();
|
final File bpCurDir = bpSd.getCurrentDir();
|
||||||
File bpPrevDir = bpSd.getPreviousDir();
|
final File bpPrevDir = bpSd.getPreviousDir();
|
||||||
assert bpCurDir.exists() : "BP level current directory must exist.";
|
assert bpCurDir.exists() : "BP level current directory must exist.";
|
||||||
cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED));
|
cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED));
|
||||||
|
|
||||||
|
@ -447,15 +455,23 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
if (bpPrevDir.exists()) {
|
if (bpPrevDir.exists()) {
|
||||||
deleteDir(bpPrevDir);
|
deleteDir(bpPrevDir);
|
||||||
}
|
}
|
||||||
File bpTmpDir = bpSd.getPreviousTmp();
|
final File bpTmpDir = bpSd.getPreviousTmp();
|
||||||
assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
|
assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
|
||||||
|
|
||||||
// 2. Rename <SD>/current/<bpid>/current to
|
// 2. Rename <SD>/current/<bpid>/current to
|
||||||
// <SD>/current/<bpid>/previous.tmp
|
// <SD>/current/<bpid>/previous.tmp
|
||||||
rename(bpCurDir, bpTmpDir);
|
rename(bpCurDir, bpTmpDir);
|
||||||
|
|
||||||
|
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
|
||||||
|
doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doUgrade(String name, final StorageDirectory bpSd,
|
||||||
|
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
|
||||||
|
final File bpCurDir, final int oldLV, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
// 3. Create new <SD>/current with block files hardlinks and VERSION
|
// 3. Create new <SD>/current with block files hardlinks and VERSION
|
||||||
linkAllBlocks(datanode, bpTmpDir, bpCurDir);
|
linkAllBlocks(bpTmpDir, bpCurDir, oldLV, conf);
|
||||||
this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
|
this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
|
||||||
assert this.namespaceID == nsInfo.getNamespaceID()
|
assert this.namespaceID == nsInfo.getNamespaceID()
|
||||||
: "Data-node and name-node layout versions must be the same.";
|
: "Data-node and name-node layout versions must be the same.";
|
||||||
|
@ -465,8 +481,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
// 4.rename <SD>/current/<bpid>/previous.tmp to
|
// 4.rename <SD>/current/<bpid>/previous.tmp to
|
||||||
// <SD>/current/<bpid>/previous
|
// <SD>/current/<bpid>/previous
|
||||||
rename(bpTmpDir, bpPrevDir);
|
rename(bpTmpDir, bpPrevDir);
|
||||||
LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot()
|
LOG.info("Upgrade of " + name + " is complete");
|
||||||
+ " is complete");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -640,17 +655,17 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param toDir the current data directory
|
* @param toDir the current data directory
|
||||||
* @throws IOException if error occurs during hardlink
|
* @throws IOException if error occurs during hardlink
|
||||||
*/
|
*/
|
||||||
private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
|
private static void linkAllBlocks(File fromDir, File toDir,
|
||||||
throws IOException {
|
int diskLayoutVersion, Configuration conf) throws IOException {
|
||||||
// do the link
|
// do the link
|
||||||
int diskLayoutVersion = this.getLayoutVersion();
|
|
||||||
// hardlink finalized blocks in tmpDir
|
// hardlink finalized blocks in tmpDir
|
||||||
HardLink hardLink = new HardLink();
|
HardLink hardLink = new HardLink();
|
||||||
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
|
DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_FINALIZED,
|
||||||
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
|
DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_RBW,
|
||||||
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
LOG.info( hardLink.linkStats.report() );
|
LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". "
|
||||||
|
+ hardLink.linkStats.report());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,11 +18,29 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.File;
|
||||||
import com.google.common.collect.ComparisonChain;
|
import java.io.IOException;
|
||||||
import com.google.common.collect.Lists;
|
import java.io.RandomAccessFile;
|
||||||
import com.google.common.collect.Maps;
|
import java.nio.channels.FileLock;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import java.nio.channels.OverlappingFileLockException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -47,28 +65,11 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
|
|
||||||
import java.io.File;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.IOException;
|
import com.google.common.collect.ComparisonChain;
|
||||||
import java.io.RandomAccessFile;
|
import com.google.common.collect.Lists;
|
||||||
import java.nio.channels.FileLock;
|
import com.google.common.collect.Maps;
|
||||||
import java.nio.channels.OverlappingFileLockException;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Data storage information file.
|
* Data storage information file.
|
||||||
|
@ -104,7 +105,7 @@ public class DataStorage extends Storage {
|
||||||
* upgraded from a pre-UUID version. For compatibility with prior
|
* upgraded from a pre-UUID version. For compatibility with prior
|
||||||
* versions of Datanodes we cannot make this field a UUID.
|
* versions of Datanodes we cannot make this field a UUID.
|
||||||
*/
|
*/
|
||||||
private String datanodeUuid = null;
|
private volatile String datanodeUuid = null;
|
||||||
|
|
||||||
// Maps block pool IDs to block pool storage
|
// Maps block pool IDs to block pool storage
|
||||||
private final Map<String, BlockPoolSliceStorage> bpStorageMap
|
private final Map<String, BlockPoolSliceStorage> bpStorageMap
|
||||||
|
@ -125,18 +126,28 @@ public class DataStorage extends Storage {
|
||||||
super(storageInfo);
|
super(storageInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized String getDatanodeUuid() {
|
public String getDatanodeUuid() {
|
||||||
return datanodeUuid;
|
return datanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setDatanodeUuid(String newDatanodeUuid) {
|
public void setDatanodeUuid(String newDatanodeUuid) {
|
||||||
this.datanodeUuid = newDatanodeUuid;
|
this.datanodeUuid = newDatanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean createStorageID(StorageDirectory sd, int lv) {
|
||||||
|
// Clusters previously upgraded from layout versions earlier than
|
||||||
|
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
|
||||||
|
// new storage ID. We check for that and fix it now.
|
||||||
|
final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
|
||||||
|
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
|
||||||
|
&& DatanodeStorage.isValidStorageId(sd.getStorageUuid());
|
||||||
|
return createStorageID(sd, !haveValidStorageId);
|
||||||
|
}
|
||||||
|
|
||||||
/** Create an ID for this storage.
|
/** Create an ID for this storage.
|
||||||
* @return true if a new storage ID was generated.
|
* @return true if a new storage ID was generated.
|
||||||
* */
|
* */
|
||||||
public synchronized boolean createStorageID(
|
public static boolean createStorageID(
|
||||||
StorageDirectory sd, boolean regenerateStorageIds) {
|
StorageDirectory sd, boolean regenerateStorageIds) {
|
||||||
final String oldStorageID = sd.getStorageUuid();
|
final String oldStorageID = sd.getStorageUuid();
|
||||||
if (oldStorageID == null || regenerateStorageIds) {
|
if (oldStorageID == null || regenerateStorageIds) {
|
||||||
|
@ -250,7 +261,7 @@ public class DataStorage extends Storage {
|
||||||
|
|
||||||
private StorageDirectory loadStorageDirectory(DataNode datanode,
|
private StorageDirectory loadStorageDirectory(DataNode datanode,
|
||||||
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
|
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
|
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
|
||||||
try {
|
try {
|
||||||
StorageState curState = sd.analyzeStorage(startOpt, this);
|
StorageState curState = sd.analyzeStorage(startOpt, this);
|
||||||
|
@ -263,9 +274,9 @@ public class DataStorage extends Storage {
|
||||||
throw new IOException("Storage directory " + dataDir
|
throw new IOException("Storage directory " + dataDir
|
||||||
+ " does not exist");
|
+ " does not exist");
|
||||||
case NOT_FORMATTED: // format
|
case NOT_FORMATTED: // format
|
||||||
LOG.info("Storage directory " + dataDir + " is not formatted for "
|
LOG.info("Storage directory " + dataDir
|
||||||
+ nsInfo.getBlockPoolID());
|
+ " is not formatted for namespace " + nsInfo.getNamespaceID()
|
||||||
LOG.info("Formatting ...");
|
+ ". Formatting...");
|
||||||
format(sd, nsInfo, datanode.getDatanodeUuid());
|
format(sd, nsInfo, datanode.getDatanodeUuid());
|
||||||
break;
|
break;
|
||||||
default: // recovery part is common
|
default: // recovery part is common
|
||||||
|
@ -276,7 +287,9 @@ public class DataStorage extends Storage {
|
||||||
// Each storage directory is treated individually.
|
// Each storage directory is treated individually.
|
||||||
// During startup some of them can upgrade or roll back
|
// During startup some of them can upgrade or roll back
|
||||||
// while others could be up-to-date for the regular startup.
|
// while others could be up-to-date for the regular startup.
|
||||||
doTransition(datanode, sd, nsInfo, startOpt);
|
if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
|
||||||
|
return sd;
|
||||||
|
}
|
||||||
|
|
||||||
// 3. Update successfully loaded storage.
|
// 3. Update successfully loaded storage.
|
||||||
setServiceLayoutVersion(getServiceLayoutVersion());
|
setServiceLayoutVersion(getServiceLayoutVersion());
|
||||||
|
@ -321,20 +334,10 @@ public class DataStorage extends Storage {
|
||||||
nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
|
nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
|
||||||
makeBlockPoolDataDir(bpDataDirs, null);
|
makeBlockPoolDataDir(bpDataDirs, null);
|
||||||
|
|
||||||
BlockPoolSliceStorage bpStorage;
|
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||||
final String bpid = nsInfo.getBlockPoolID();
|
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
|
||||||
synchronized (this) {
|
nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
|
||||||
bpStorage = this.bpStorageMap.get(bpid);
|
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
|
||||||
if (bpStorage == null) {
|
|
||||||
bpStorage = new BlockPoolSliceStorage(
|
|
||||||
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
|
|
||||||
nsInfo.getClusterID());
|
|
||||||
addBlockPoolStorage(bpid, bpStorage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
builder.addBpStorageDirectories(
|
|
||||||
bpid, bpStorage.loadBpStorageDirectories(
|
|
||||||
datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP));
|
|
||||||
}
|
}
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
@ -347,53 +350,68 @@ public class DataStorage extends Storage {
|
||||||
* @param nsInfo namespace information
|
* @param nsInfo namespace information
|
||||||
* @param dataDirs array of data storage directories
|
* @param dataDirs array of data storage directories
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @return a list of successfully loaded volumes.
|
* @return a list of successfully loaded storage directories.
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
synchronized List<StorageLocation> addStorageLocations(DataNode datanode,
|
synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
|
||||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||||
StartupOption startOpt) throws IOException {
|
StartupOption startOpt) throws IOException {
|
||||||
final String bpid = nsInfo.getBlockPoolID();
|
final List<StorageLocation> successLocations = loadDataStorage(
|
||||||
List<StorageLocation> successVolumes = Lists.newArrayList();
|
datanode, nsInfo, dataDirs, startOpt);
|
||||||
|
return loadBlockPoolSliceStorage(
|
||||||
|
datanode, nsInfo, successLocations, startOpt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<StorageLocation> loadDataStorage(DataNode datanode,
|
||||||
|
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||||
|
StartupOption startOpt) throws IOException {
|
||||||
|
final List<StorageLocation> success = Lists.newArrayList();
|
||||||
for (StorageLocation dataDir : dataDirs) {
|
for (StorageLocation dataDir : dataDirs) {
|
||||||
File root = dataDir.getFile();
|
File root = dataDir.getFile();
|
||||||
if (!containsStorageDir(root)) {
|
if (!containsStorageDir(root)) {
|
||||||
try {
|
try {
|
||||||
// It first ensures the datanode level format is completed.
|
// It first ensures the datanode level format is completed.
|
||||||
StorageDirectory sd = loadStorageDirectory(
|
final StorageDirectory sd = loadStorageDirectory(
|
||||||
datanode, nsInfo, root, startOpt);
|
datanode, nsInfo, root, startOpt);
|
||||||
addStorageDir(sd);
|
addStorageDir(sd);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(e);
|
LOG.warn("Failed to add storage directory " + dataDir, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Storage directory " + dataDir + " has already been used.");
|
LOG.info("Storage directory " + dataDir + " has already been used.");
|
||||||
}
|
}
|
||||||
|
success.add(dataDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
|
||||||
|
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||||
|
StartupOption startOpt) throws IOException {
|
||||||
|
final String bpid = nsInfo.getBlockPoolID();
|
||||||
|
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||||
|
final List<StorageDirectory> success = Lists.newArrayList();
|
||||||
|
for (StorageLocation dataDir : dataDirs) {
|
||||||
|
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
|
||||||
List<File> bpDataDirs = new ArrayList<File>();
|
List<File> bpDataDirs = new ArrayList<File>();
|
||||||
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root,
|
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, curDir));
|
||||||
STORAGE_DIR_CURRENT)));
|
|
||||||
try {
|
try {
|
||||||
makeBlockPoolDataDir(bpDataDirs, null);
|
makeBlockPoolDataDir(bpDataDirs, null);
|
||||||
BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
|
|
||||||
if (bpStorage == null) {
|
|
||||||
bpStorage = new BlockPoolSliceStorage(
|
|
||||||
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
|
|
||||||
nsInfo.getClusterID());
|
|
||||||
}
|
|
||||||
|
|
||||||
bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
|
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
|
||||||
addBlockPoolStorage(bpid, bpStorage);
|
nsInfo, bpDataDirs, startOpt, datanode.getConf());
|
||||||
|
for(StorageDirectory sd : dirs) {
|
||||||
|
success.add(sd);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to add storage for block pool: " + bpid + " : "
|
LOG.warn("Failed to add storage directory " + dataDir
|
||||||
+ e.getMessage());
|
+ " for block pool " + bpid, e);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
successVolumes.add(dataDir);
|
|
||||||
}
|
}
|
||||||
return successVolumes;
|
|
||||||
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -635,17 +653,13 @@ public class DataStorage extends Storage {
|
||||||
* Upgrade if this.LV > LAYOUT_VERSION
|
* Upgrade if this.LV > LAYOUT_VERSION
|
||||||
* Regular startup if this.LV = LAYOUT_VERSION
|
* Regular startup if this.LV = LAYOUT_VERSION
|
||||||
*
|
*
|
||||||
* @param datanode Datanode to which this storage belongs to
|
|
||||||
* @param sd storage directory
|
* @param sd storage directory
|
||||||
* @param nsInfo namespace info
|
* @param nsInfo namespace info
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @throws IOException
|
* @return true if the new properties has been written.
|
||||||
*/
|
*/
|
||||||
private void doTransition( DataNode datanode,
|
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||||
StorageDirectory sd,
|
StartupOption startOpt, Configuration conf) throws IOException {
|
||||||
NamespaceInfo nsInfo,
|
|
||||||
StartupOption startOpt
|
|
||||||
) throws IOException {
|
|
||||||
if (startOpt == StartupOption.ROLLBACK) {
|
if (startOpt == StartupOption.ROLLBACK) {
|
||||||
doRollback(sd, nsInfo); // rollback if applicable
|
doRollback(sd, nsInfo); // rollback if applicable
|
||||||
}
|
}
|
||||||
|
@ -674,25 +688,16 @@ public class DataStorage extends Storage {
|
||||||
+ nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
|
+ nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clusters previously upgraded from layout versions earlier than
|
|
||||||
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
|
|
||||||
// new storage ID. We check for that and fix it now.
|
|
||||||
boolean haveValidStorageId =
|
|
||||||
DataNodeLayoutVersion.supports(
|
|
||||||
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, layoutVersion) &&
|
|
||||||
DatanodeStorage.isValidStorageId(sd.getStorageUuid());
|
|
||||||
|
|
||||||
// regular start up.
|
// regular start up.
|
||||||
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
||||||
createStorageID(sd, !haveValidStorageId);
|
createStorageID(sd, layoutVersion);
|
||||||
return; // regular startup
|
return false; // need to write properties
|
||||||
}
|
}
|
||||||
|
|
||||||
// do upgrade
|
// do upgrade
|
||||||
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
|
||||||
doUpgrade(datanode, sd, nsInfo); // upgrade
|
doUpgrade(sd, nsInfo, conf); // upgrade
|
||||||
createStorageID(sd, !haveValidStorageId);
|
return true; // doUgrade already has written properties
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer
|
// layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer
|
||||||
|
@ -726,8 +731,8 @@ public class DataStorage extends Storage {
|
||||||
* @param sd storage directory
|
* @param sd storage directory
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
|
void doUpgrade(final StorageDirectory sd, final NamespaceInfo nsInfo,
|
||||||
throws IOException {
|
final Configuration conf) throws IOException {
|
||||||
// If the existing on-disk layout version supportes federation, simply
|
// If the existing on-disk layout version supportes federation, simply
|
||||||
// update its layout version.
|
// update its layout version.
|
||||||
if (DataNodeLayoutVersion.supports(
|
if (DataNodeLayoutVersion.supports(
|
||||||
|
@ -743,15 +748,16 @@ public class DataStorage extends Storage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final int oldLV = getLayoutVersion();
|
||||||
LOG.info("Upgrading storage directory " + sd.getRoot()
|
LOG.info("Upgrading storage directory " + sd.getRoot()
|
||||||
+ ".\n old LV = " + this.getLayoutVersion()
|
+ ".\n old LV = " + oldLV
|
||||||
+ "; old CTime = " + this.getCTime()
|
+ "; old CTime = " + this.getCTime()
|
||||||
+ ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
+ ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
||||||
+ "; new CTime = " + nsInfo.getCTime());
|
+ "; new CTime = " + nsInfo.getCTime());
|
||||||
|
|
||||||
File curDir = sd.getCurrentDir();
|
final File curDir = sd.getCurrentDir();
|
||||||
File prevDir = sd.getPreviousDir();
|
final File prevDir = sd.getPreviousDir();
|
||||||
File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
|
final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
|
||||||
|
|
||||||
assert curDir.exists() : "Data node current directory must exist.";
|
assert curDir.exists() : "Data node current directory must exist.";
|
||||||
// Cleanup directory "detach"
|
// Cleanup directory "detach"
|
||||||
|
@ -761,21 +767,29 @@ public class DataStorage extends Storage {
|
||||||
if (prevDir.exists())
|
if (prevDir.exists())
|
||||||
deleteDir(prevDir);
|
deleteDir(prevDir);
|
||||||
// get previous.tmp directory, <SD>/previous.tmp
|
// get previous.tmp directory, <SD>/previous.tmp
|
||||||
File tmpDir = sd.getPreviousTmp();
|
final File tmpDir = sd.getPreviousTmp();
|
||||||
assert !tmpDir.exists() :
|
assert !tmpDir.exists() :
|
||||||
"Data node previous.tmp directory must not exist.";
|
"Data node previous.tmp directory must not exist.";
|
||||||
|
|
||||||
// 2. Rename <SD>/current to <SD>/previous.tmp
|
// 2. Rename <SD>/current to <SD>/previous.tmp
|
||||||
rename(curDir, tmpDir);
|
rename(curDir, tmpDir);
|
||||||
|
|
||||||
// 3. Format BP and hard link blocks from previous directory
|
// 3.1. Format BP
|
||||||
File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir);
|
File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir);
|
||||||
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
|
BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||||
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
|
|
||||||
bpStorage.format(curDir, nsInfo);
|
bpStorage.format(curDir, nsInfo);
|
||||||
linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
|
|
||||||
STORAGE_DIR_CURRENT));
|
final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
|
||||||
|
doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doUgrade(final StorageDirectory sd,
|
||||||
|
final NamespaceInfo nsInfo, final File prevDir,
|
||||||
|
final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
// 3.2. Link block files from <SD>/previous.tmp to <SD>/current
|
||||||
|
linkAllBlocks(tmpDir, bbwDir, toDir, oldLV, conf);
|
||||||
|
|
||||||
// 4. Write version file under <SD>/current
|
// 4. Write version file under <SD>/current
|
||||||
layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
|
layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
|
||||||
clusterID = nsInfo.getClusterID();
|
clusterID = nsInfo.getClusterID();
|
||||||
|
@ -784,7 +798,8 @@ public class DataStorage extends Storage {
|
||||||
// 5. Rename <SD>/previous.tmp to <SD>/previous
|
// 5. Rename <SD>/previous.tmp to <SD>/previous
|
||||||
rename(tmpDir, prevDir);
|
rename(tmpDir, prevDir);
|
||||||
LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
|
LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
|
||||||
addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage);
|
|
||||||
|
createStorageID(sd, layoutVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -952,23 +967,22 @@ public class DataStorage extends Storage {
|
||||||
*
|
*
|
||||||
* @throws IOException If error occurs during hardlink
|
* @throws IOException If error occurs during hardlink
|
||||||
*/
|
*/
|
||||||
private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
|
private static void linkAllBlocks(File fromDir, File fromBbwDir, File toDir,
|
||||||
File toDir) throws IOException {
|
int diskLayoutVersion, Configuration conf) throws IOException {
|
||||||
HardLink hardLink = new HardLink();
|
HardLink hardLink = new HardLink();
|
||||||
// do the link
|
// do the link
|
||||||
int diskLayoutVersion = this.getLayoutVersion();
|
|
||||||
if (DataNodeLayoutVersion.supports(
|
if (DataNodeLayoutVersion.supports(
|
||||||
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
|
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
|
||||||
// hardlink finalized blocks in tmpDir/finalized
|
// hardlink finalized blocks in tmpDir/finalized
|
||||||
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
|
linkBlocks(fromDir, toDir, STORAGE_DIR_FINALIZED,
|
||||||
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
// hardlink rbw blocks in tmpDir/rbw
|
// hardlink rbw blocks in tmpDir/rbw
|
||||||
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
|
linkBlocks(fromDir, toDir, STORAGE_DIR_RBW,
|
||||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
} else { // pre-RBW version
|
} else { // pre-RBW version
|
||||||
// hardlink finalized blocks in tmpDir
|
// hardlink finalized blocks in tmpDir
|
||||||
linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
|
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
|
||||||
diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
if (fromBbwDir.exists()) {
|
if (fromBbwDir.exists()) {
|
||||||
/*
|
/*
|
||||||
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
|
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
|
||||||
|
@ -976,11 +990,12 @@ public class DataStorage extends Storage {
|
||||||
* NOT underneath the 'current' directory in those releases. See
|
* NOT underneath the 'current' directory in those releases. See
|
||||||
* HDFS-3731 for details.
|
* HDFS-3731 for details.
|
||||||
*/
|
*/
|
||||||
linkBlocks(datanode, fromBbwDir,
|
linkBlocks(fromBbwDir, new File(toDir, STORAGE_DIR_RBW),
|
||||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info( hardLink.linkStats.report() );
|
LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". "
|
||||||
|
+ hardLink.linkStats.report());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LinkArgs {
|
private static class LinkArgs {
|
||||||
|
@ -993,8 +1008,15 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
|
static void linkBlocks(File fromDir, File toDir, String subdir, int oldLV,
|
||||||
HardLink hl) throws IOException {
|
HardLink hl, Configuration conf) throws IOException {
|
||||||
|
linkBlocks(new File(fromDir, subdir), new File(toDir, subdir),
|
||||||
|
oldLV, hl, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void linkBlocks(File from, File to, int oldLV,
|
||||||
|
HardLink hl, Configuration conf) throws IOException {
|
||||||
|
LOG.info("Start linking block files from " + from + " to " + to);
|
||||||
boolean upgradeToIdBasedLayout = false;
|
boolean upgradeToIdBasedLayout = false;
|
||||||
// If we are upgrading from a version older than the one where we introduced
|
// If we are upgrading from a version older than the one where we introduced
|
||||||
// block ID-based layout AND we're working with the finalized directory,
|
// block ID-based layout AND we're working with the finalized directory,
|
||||||
|
@ -1017,7 +1039,7 @@ public class DataStorage extends Storage {
|
||||||
removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
|
removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
|
||||||
}
|
}
|
||||||
|
|
||||||
int numLinkWorkers = datanode.getConf().getInt(
|
final int numLinkWorkers = conf.getInt(
|
||||||
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
|
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
|
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
|
||||||
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
|
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
|
||||||
|
@ -1268,13 +1290,19 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add bpStorage into bpStorageMap
|
* Get the BlockPoolSliceStorage from {@link bpStorageMap}.
|
||||||
|
* If the object is not found, create a new object and put it to the map.
|
||||||
*/
|
*/
|
||||||
private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage
|
synchronized BlockPoolSliceStorage getBlockPoolSliceStorage(
|
||||||
) {
|
final NamespaceInfo nsInfo) {
|
||||||
if (!this.bpStorageMap.containsKey(bpID)) {
|
final String bpid = nsInfo.getBlockPoolID();
|
||||||
this.bpStorageMap.put(bpID, bpStorage);
|
BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpid);
|
||||||
|
if (bpStorage == null) {
|
||||||
|
bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), bpid,
|
||||||
|
nsInfo.getCTime(), nsInfo.getClusterID());
|
||||||
|
bpStorageMap.put(bpid, bpStorage);
|
||||||
}
|
}
|
||||||
|
return bpStorage;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void removeBlockPoolStorage(String bpId) {
|
synchronized void removeBlockPoolStorage(String bpId) {
|
||||||
|
|
|
@ -101,4 +101,19 @@ public class StorageLocation {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "[" + storageType + "]" + file.toURI();
|
return "[" + storageType + "]" + file.toURI();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
} else if (obj == null || !(obj instanceof StorageLocation)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return toString().equals(obj.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return toString().hashCode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -517,7 +517,8 @@ public class TestReplication {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setLong(
|
conf.setLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
||||||
|
.storagesPerDatanode(1).build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
Path filePath = new Path("/test");
|
Path filePath = new Path("/test");
|
||||||
FSDataOutputStream create = fs.create(filePath);
|
FSDataOutputStream create = fs.create(filePath);
|
||||||
|
|
|
@ -477,7 +477,7 @@ public class UpgradeUtilities {
|
||||||
for (int i = 0; i < parent.length; i++) {
|
for (int i = 0; i < parent.length; i++) {
|
||||||
File versionFile = new File(parent[i], "VERSION");
|
File versionFile = new File(parent[i], "VERSION");
|
||||||
StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
|
StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
|
||||||
storage.createStorageID(sd, false);
|
DataStorage.createStorageID(sd, false);
|
||||||
storage.writeProperties(versionFile, sd);
|
storage.writeProperties(versionFile, sd);
|
||||||
versionFiles[i] = versionFile;
|
versionFiles[i] = versionFile;
|
||||||
File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);
|
File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);
|
||||||
|
|
|
@ -538,7 +538,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
if (storage != null) {
|
if (storage != null) {
|
||||||
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
|
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
|
||||||
storage.createStorageID(storage.getStorageDir(i), false);
|
DataStorage.createStorageID(storage.getStorageDir(i), false);
|
||||||
}
|
}
|
||||||
this.datanodeUuid = storage.getDatanodeUuid();
|
this.datanodeUuid = storage.getDatanodeUuid();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -51,10 +51,12 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -288,15 +290,27 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||||
String[] expectDataDirs = newDataDir.split(",");
|
String[] expectDataDirs = newDataDir.split(",");
|
||||||
assertEquals(expectDataDirs.length, effectiveDataDirs.length);
|
assertEquals(expectDataDirs.length, effectiveDataDirs.length);
|
||||||
|
List<StorageLocation> expectedStorageLocations = new ArrayList<>();
|
||||||
|
List<StorageLocation> effectiveStorageLocations = new ArrayList<>();
|
||||||
for (int i = 0; i < expectDataDirs.length; i++) {
|
for (int i = 0; i < expectDataDirs.length; i++) {
|
||||||
StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
|
StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
|
||||||
StorageLocation effectiveLocation =
|
StorageLocation effectiveLocation = StorageLocation
|
||||||
StorageLocation.parse(effectiveDataDirs[i]);
|
.parse(effectiveDataDirs[i]);
|
||||||
assertEquals(expectLocation.getStorageType(),
|
expectedStorageLocations.add(expectLocation);
|
||||||
effectiveLocation.getStorageType());
|
effectiveStorageLocations.add(effectiveLocation);
|
||||||
assertEquals(expectLocation.getFile().getCanonicalFile(),
|
|
||||||
effectiveLocation.getFile().getCanonicalFile());
|
|
||||||
}
|
}
|
||||||
|
Comparator<StorageLocation> comparator = new Comparator<StorageLocation>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(StorageLocation o1, StorageLocation o2) {
|
||||||
|
return o1.toString().compareTo(o2.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
Collections.sort(expectedStorageLocations, comparator);
|
||||||
|
Collections.sort(effectiveStorageLocations, comparator);
|
||||||
|
assertEquals("Effective volumes doesnt match expected",
|
||||||
|
expectedStorageLocations, effectiveStorageLocations);
|
||||||
|
|
||||||
// Check that all newly created volumes are appropriately formatted.
|
// Check that all newly created volumes are appropriately formatted.
|
||||||
for (File volumeDir : newVolumeDirs) {
|
for (File volumeDir : newVolumeDirs) {
|
||||||
|
@ -473,11 +487,27 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
|
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
Collection<String> oldDirs = getDataDirs(dn);
|
Collection<String> oldDirs = getDataDirs(dn);
|
||||||
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
|
// Findout the storage with block and remove it
|
||||||
|
ExtendedBlock block =
|
||||||
|
DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock();
|
||||||
|
FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
|
||||||
|
String basePath = volumeWithBlock.getBasePath();
|
||||||
|
File storageDir = new File(basePath);
|
||||||
|
URI fileUri = storageDir.toURI();
|
||||||
|
String dirWithBlock =
|
||||||
|
"[" + volumeWithBlock.getStorageType() + "]" + fileUri;
|
||||||
|
String newDirs = dirWithBlock;
|
||||||
|
for (String dir : oldDirs) {
|
||||||
|
if (dirWithBlock.startsWith(dir)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
newDirs = dir;
|
||||||
|
break;
|
||||||
|
}
|
||||||
dn.reconfigurePropertyImpl(
|
dn.reconfigurePropertyImpl(
|
||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
|
||||||
assertFileLocksReleased(
|
oldDirs.remove(newDirs);
|
||||||
new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
|
assertFileLocksReleased(oldDirs);
|
||||||
|
|
||||||
triggerDeleteReport(dn);
|
triggerDeleteReport(dn);
|
||||||
|
|
||||||
|
|
|
@ -18,9 +18,12 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -54,11 +57,13 @@ public class TestDataStorage {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
storage = new DataStorage();
|
storage = new DataStorage();
|
||||||
nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
|
nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
|
||||||
BUILD_VERSION, SOFTWARE_VERSION);
|
BUILD_VERSION, SOFTWARE_VERSION);
|
||||||
FileUtil.fullyDelete(TEST_DIR);
|
FileUtil.fullyDelete(TEST_DIR);
|
||||||
assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
|
assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
|
||||||
|
Mockito.when(mockDN.getConf()).thenReturn(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -146,7 +151,7 @@ public class TestDataStorage {
|
||||||
assertEquals(numLocations, storage.getNumStorageDirs());
|
assertEquals(numLocations, storage.getNumStorageDirs());
|
||||||
|
|
||||||
locations = createStorageLocations(numLocations);
|
locations = createStorageLocations(numLocations);
|
||||||
List<StorageLocation> addedLocation =
|
List<StorageDirectory> addedLocation =
|
||||||
storage.addStorageLocations(mockDN, namespaceInfos.get(0),
|
storage.addStorageLocations(mockDN, namespaceInfos.get(0),
|
||||||
locations, START_OPT);
|
locations, START_OPT);
|
||||||
assertTrue(addedLocation.isEmpty());
|
assertTrue(addedLocation.isEmpty());
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class TestFsDatasetImpl {
|
||||||
|
|
||||||
private static Storage.StorageDirectory createStorageDirectory(File root) {
|
private static Storage.StorageDirectory createStorageDirectory(File root) {
|
||||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
|
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
|
||||||
dsForStorageUuid.createStorageID(sd, false);
|
DataStorage.createStorageID(sd, false);
|
||||||
return sd;
|
return sd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue