HDFS-2223. Untangle depencencies between NN components. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1166466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-09-07 23:23:24 +00:00
parent 2ca9c8d926
commit 06e84a1bca
17 changed files with 245 additions and 235 deletions

View File

@ -16,6 +16,8 @@ Trunk (unreleased changes)
HDFS-2018. Move all journal stream management code into one place.
(Ivan Kelly via jitendra)
HDFS-2223. Untangle depencencies between NN components (todd)
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

View File

@ -83,6 +83,8 @@ public class BackupImage extends FSImage {
*/
private boolean stopApplyingEditsOnNextRoll = false;
private FSNamesystem namesystem;
/**
* Construct a backup image.
* @param conf Configuration
@ -94,6 +96,10 @@ public class BackupImage extends FSImage {
bnState = BNState.DROP_UNTIL_NEXT_ROLL;
}
void setNamesystem(FSNamesystem fsn) {
this.namesystem = fsn;
}
/**
* Analyze backup storage directories for consistency.<br>
* Recover from incomplete checkpoints if required.<br>
@ -141,7 +147,7 @@ public class BackupImage extends FSImage {
* and create empty edits.
*/
void saveCheckpoint() throws IOException {
saveNamespace();
saveNamespace(namesystem);
}
/**
@ -224,7 +230,7 @@ public class BackupImage extends FSImage {
}
lastAppliedTxId += numTxns;
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
} finally {
backupInputStream.clear();
}
@ -273,7 +279,7 @@ public class BackupImage extends FSImage {
editStreams.add(s);
}
}
loadEdits(editStreams);
loadEdits(editStreams, namesystem);
}
// now, need to load the in-progress file

View File

@ -122,6 +122,7 @@ public class BackupNode extends NameNode {
protected void loadNamesystem(Configuration conf) throws IOException {
BackupImage bnImage = new BackupImage(conf);
this.namesystem = new FSNamesystem(conf, bnImage);
bnImage.setNamesystem(namesystem);
bnImage.recoverCreateRead();
}

View File

@ -224,7 +224,7 @@ class Checkpointer extends Daemon {
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
bnImage.reloadFromImageFile(file);
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
}
lastApplied = bnImage.getLastAppliedTxId();
@ -238,11 +238,11 @@ class Checkpointer extends Daemon {
backupNode.nnHttpAddress, log, bnStorage);
}
rollForwardByApplyingLogs(manifest, bnImage);
rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
}
long txid = bnImage.getLastAppliedTxId();
bnImage.saveFSImageInAllDirs(txid);
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
bnStorage.writeAll();
if(cpCmd.needToReturnImage()) {
@ -272,7 +272,8 @@ class Checkpointer extends Daemon {
static void rollForwardByApplyingLogs(
RemoteEditLogManifest manifest,
FSImage dstImage) throws IOException {
FSImage dstImage,
FSNamesystem dstNamesystem) throws IOException {
NNStorage dstStorage = dstImage.getStorage();
List<EditLogInputStream> editsStreams = Lists.newArrayList();
@ -286,6 +287,6 @@ class Checkpointer extends Daemon {
}
LOG.info("Checkpointer about to load edits from " +
editsStreams.size() + " stream(s).");
dstImage.loadEdits(editsStreams);
dstImage.loadEdits(editsStreams, dstNamesystem);
}
}

View File

@ -56,9 +56,10 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.util.ByteArray;
import com.google.common.base.Preconditions;
/*************************************************
* FSDirectory stores the filesystem directory state.
* It handles writing/loading values to disk, and logging
@ -72,6 +73,7 @@ public class FSDirectory implements Closeable {
INodeDirectoryWithQuota rootDir;
FSImage fsImage;
private final FSNamesystem namesystem;
private volatile boolean ready = false;
private static final long UNKNOWN_DISK_SPACE = -1;
private final int maxComponentLength;
@ -113,15 +115,9 @@ public class FSDirectory implements Closeable {
*/
private final NameCache<ByteArray> nameCache;
/** Access an existing dfs name directory. */
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this(new FSImage(conf), ns, conf);
}
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition();
fsImage.setFSNamesystem(ns);
rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
ns.createFsOwnerPermissions(new FsPermission((short)0755)),
Integer.MAX_VALUE, UNKNOWN_DISK_SPACE);
@ -145,10 +141,11 @@ public class FSDirectory implements Closeable {
NameNode.LOG.info("Caching file names occuring more than " + threshold
+ " times ");
nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns;
}
private FSNamesystem getFSNamesystem() {
return fsImage.getFSNamesystem();
return namesystem;
}
private BlockManager getBlockManager() {
@ -156,33 +153,11 @@ public class FSDirectory implements Closeable {
}
/**
* Load the filesystem image into memory.
*
* @param startOpt Startup type as specified by the user.
* @throws IOException If image or editlog cannot be read.
* Notify that loading of this FSDirectory is complete, and
* it is ready for use
*/
void loadFSImage(StartupOption startOpt)
throws IOException {
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
fsImage.format(fsImage.getStorage().determineClusterId());// reuse current id
startOpt = StartupOption.REGULAR;
}
boolean success = false;
try {
if (fsImage.recoverTransitionRead(startOpt)) {
fsImage.saveNamespace();
}
fsImage.openEditLog();
fsImage.setCheckpointDirectories(null, null);
success = true;
} finally {
if (!success) {
fsImage.close();
}
}
void imageLoadComplete() {
Preconditions.checkState(!ready, "FSDirectory already loaded");
writeLock();
try {
setReady(true);

View File

@ -70,7 +70,6 @@ import com.google.common.collect.Lists;
public class FSImage implements Closeable {
protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
protected FSNamesystem namesystem = null;
protected FSEditLog editLog = null;
private boolean isUpgradeFinalized = false;
@ -82,38 +81,20 @@ public class FSImage implements Closeable {
*/
protected long lastAppliedTxId = 0;
/**
* URIs for importing an image from a checkpoint. In the default case,
* URIs will represent directories.
*/
private Collection<URI> checkpointDirs;
private Collection<URI> checkpointEditsDirs;
final private Configuration conf;
private final NNStorageRetentionManager archivalManager;
/**
* Construct an FSImage.
* @param conf Configuration
* @see #FSImage(Configuration conf, FSNamesystem ns,
* Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
public FSImage(Configuration conf) throws IOException {
this(conf, (FSNamesystem)null);
}
/**
* Construct an FSImage
* @param conf Configuration
* @param ns The FSNamesystem using this image.
* @see #FSImage(Configuration conf, FSNamesystem ns,
* @see #FSImage(Configuration conf,
* Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
private FSImage(Configuration conf, FSNamesystem ns) throws IOException {
this(conf, ns,
protected FSImage(Configuration conf) throws IOException {
this(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
}
@ -124,17 +105,14 @@ public class FSImage implements Closeable {
* Setup storage and initialize the edit log.
*
* @param conf Configuration
* @param ns The FSNamesystem using this image.
* @param imageDirs Directories the image can be stored in.
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
protected FSImage(Configuration conf, FSNamesystem ns,
protected FSImage(Configuration conf,
Collection<URI> imageDirs, Collection<URI> editsDirs)
throws IOException {
this.conf = conf;
setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
FSImage.getCheckpointEditsDirs(conf, null));
storage = new NNStorage(conf, imageDirs, editsDirs);
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
@ -143,31 +121,18 @@ public class FSImage implements Closeable {
}
this.editLog = new FSEditLog(storage);
setFSNamesystem(ns);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
}
protected FSNamesystem getFSNamesystem() {
return namesystem;
}
void setFSNamesystem(FSNamesystem ns) {
namesystem = ns;
if (ns != null) {
storage.setUpgradeManager(ns.upgradeManager);
}
}
void setCheckpointDirectories(Collection<URI> dirs,
Collection<URI> editsDirs) {
checkpointDirs = dirs;
checkpointEditsDirs = editsDirs;
}
void format(String clusterId) throws IOException {
void format(FSNamesystem fsn, String clusterId) throws IOException {
long fileCount = fsn.getTotalFiles();
// Expect 1 file, which is the root inode
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
storage.format(clusterId);
saveFSImageInAllDirs(0);
saveFSImageInAllDirs(fsn, 0);
}
/**
@ -179,7 +144,7 @@ public class FSImage implements Closeable {
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt)
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
@ -187,21 +152,14 @@ public class FSImage implements Closeable {
Collection<URI> imageDirs = storage.getImageDirectories();
Collection<URI> editsDirs = storage.getEditsDirectories();
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");
if(startOpt == StartupOption.IMPORT
&& (checkpointDirs == null || checkpointDirs.isEmpty()))
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
if(startOpt == StartupOption.IMPORT
&& (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
storage.setUpgradeManager(target.upgradeManager);
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
@ -261,10 +219,10 @@ public class FSImage implements Closeable {
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
doUpgrade();
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint();
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
doRollback();
@ -273,7 +231,7 @@ public class FSImage implements Closeable {
// just load the image
}
return loadFSImage();
return loadFSImage(target);
}
/**
@ -324,11 +282,11 @@ public class FSImage implements Closeable {
return isFormatted;
}
private void doUpgrade() throws IOException {
private void doUpgrade(FSNamesystem target) throws IOException {
if(storage.getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
this.loadFSImage();
this.loadFSImage(target);
storage.initializeDistributedUpgrade();
return;
}
@ -343,7 +301,7 @@ public class FSImage implements Closeable {
}
// load the latest image
this.loadFSImage();
this.loadFSImage(target);
// Do upgrade for each directory
long oldCTime = storage.getCTime();
@ -385,7 +343,7 @@ public class FSImage implements Closeable {
storage.reportErrorsOnDirectories(errorSDs);
errorSDs.clear();
saveFSImageInAllDirs(editLog.getLastWrittenTxId());
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@ -422,7 +380,7 @@ public class FSImage implements Closeable {
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
boolean canRollback = false;
FSImage prevState = new FSImage(conf, getFSNamesystem());
FSImage prevState = new FSImage(conf);
prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@ -504,19 +462,32 @@ public class FSImage implements Closeable {
/**
* Load image from a checkpoint directory and save it into the current one.
* @param target the NameSystem to import into
* @throws IOException
*/
void doImportCheckpoint() throws IOException {
FSNamesystem fsNamesys = getFSNamesystem();
FSImage ckptImage = new FSImage(conf, fsNamesys,
void doImportCheckpoint(FSNamesystem target) throws IOException {
Collection<URI> checkpointDirs =
FSImage.getCheckpointDirs(conf, null);
Collection<URI> checkpointEditsDirs =
FSImage.getCheckpointEditsDirs(conf, null);
if (checkpointDirs == null || checkpointDirs.isEmpty()) {
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
}
if (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()) {
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
}
FSImage realImage = target.getFSImage();
FSImage ckptImage = new FSImage(conf,
checkpointDirs, checkpointEditsDirs);
// replace real image with the checkpoint image
FSImage realImage = fsNamesys.getFSImage();
assert realImage == this;
fsNamesys.dir.fsImage = ckptImage;
target.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
ckptImage.recoverTransitionRead(StartupOption.REGULAR);
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
} finally {
ckptImage.close();
}
@ -524,10 +495,11 @@ public class FSImage implements Closeable {
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
fsNamesys.dir.fsImage = realImage;
target.dir.fsImage = realImage;
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
// and save it but keep the same checkpointTime
saveNamespace();
saveNamespace(target);
getStorage().writeAll();
}
@ -558,11 +530,11 @@ public class FSImage implements Closeable {
* Toss the current image and namesystem, reloading from the specified
* file.
*/
void reloadFromImageFile(File file) throws IOException {
namesystem.dir.reset();
void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
target.dir.reset();
LOG.debug("Reloading namespace from " + file);
loadFSImage(file);
loadFSImage(file, target);
}
/**
@ -580,7 +552,7 @@ public class FSImage implements Closeable {
* @return whether the image should be saved
* @throws IOException
*/
boolean loadFSImage() throws IOException {
boolean loadFSImage(FSNamesystem target) throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
isUpgradeFinalized = inspector.isUpgradeFinalized();
@ -615,7 +587,7 @@ public class FSImage implements Closeable {
getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file
// next to the image file
loadFSImage(imageFile.getFile());
loadFSImage(imageFile.getFile(), target);
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file.
@ -627,17 +599,17 @@ public class FSImage implements Closeable {
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot());
}
loadFSImage(imageFile.getFile(), new MD5Hash(md5));
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
} else {
// We don't have any record of the md5sum
loadFSImage(imageFile.getFile(), null);
loadFSImage(imageFile.getFile(), null, target);
}
} catch (IOException ioe) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load image from " + imageFile, ioe);
}
long numLoaded = loadEdits(editStreams);
long numLoaded = loadEdits(editStreams, target);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
numLoaded);
@ -671,14 +643,15 @@ public class FSImage implements Closeable {
* Load the specified list of edit files into the image.
* @return the number of transactions loaded
*/
protected long loadEdits(Iterable<EditLogInputStream> editStreams) throws IOException {
protected long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
long startingTxId = getLastAppliedTxId() + 1;
int numLoaded = 0;
try {
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
FSEditLogLoader loader = new FSEditLogLoader(target);
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
@ -693,7 +666,7 @@ public class FSImage implements Closeable {
}
// update the counts
getFSNamesystem().dir.updateCountForINodeWithQuota();
target.dir.updateCountForINodeWithQuota();
return numLoaded;
}
@ -702,13 +675,14 @@ public class FSImage implements Closeable {
* Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file.
*/
private void loadFSImage(File imageFile) throws IOException {
private void loadFSImage(File imageFile, FSNamesystem target)
throws IOException {
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
if (expectedMD5 == null) {
throw new IOException("No MD5 file found corresponding to image file "
+ imageFile);
}
loadFSImage(imageFile, expectedMD5);
loadFSImage(imageFile, expectedMD5, target);
}
/**
@ -716,11 +690,12 @@ public class FSImage implements Closeable {
* filenames and blocks. Return whether we should
* "re-save" and consolidate the edit-logs
*/
private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target) throws IOException {
FSImageFormat.Loader loader = new FSImageFormat.Loader(
conf, getFSNamesystem());
conf, target);
loader.load(curFile);
namesystem.setBlockPoolId(this.getBlockPoolID());
target.setBlockPoolId(this.getBlockPoolID());
// Check that the image digest we loaded matches up with what
// we expected
@ -741,13 +716,14 @@ public class FSImage implements Closeable {
/**
* Save the contents of the FS image to the file.
*/
void saveFSImage(StorageDirectory sd, long txid) throws IOException {
void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
throws IOException {
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
FSImageFormat.Saver saver = new FSImageFormat.Saver();
FSImageCompression compression = FSImageCompression.createCompression(conf);
saver.save(newFile, txid, getFSNamesystem(), compression);
saver.save(newFile, txid, source, compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointTxId(txid);
@ -768,8 +744,11 @@ public class FSImage implements Closeable {
private StorageDirectory sd;
private List<StorageDirectory> errorSDs;
private final long txid;
private final FSNamesystem source;
FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs, long txid) {
FSImageSaver(FSNamesystem source, StorageDirectory sd,
List<StorageDirectory> errorSDs, long txid) {
this.source = source;
this.sd = sd;
this.errorSDs = errorSDs;
this.txid = txid;
@ -777,7 +756,7 @@ public class FSImage implements Closeable {
public void run() {
try {
saveFSImage(sd, txid);
saveFSImage(source, sd, txid);
} catch (Throwable t) {
LOG.error("Unable to save image for " + sd.getRoot(), t);
errorSDs.add(sd);
@ -806,7 +785,7 @@ public class FSImage implements Closeable {
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
void saveNamespace() throws IOException {
void saveNamespace(FSNamesystem source) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@ -817,7 +796,7 @@ public class FSImage implements Closeable {
}
long imageTxId = editLog.getLastWrittenTxId();
try {
saveFSImageInAllDirs(imageTxId);
saveFSImageInAllDirs(source, imageTxId);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@ -829,7 +808,8 @@ public class FSImage implements Closeable {
}
protected void saveFSImageInAllDirs(long txid) throws IOException {
protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
@ -842,7 +822,7 @@ public class FSImage implements Closeable {
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
FSImageSaver saver = new FSImageSaver(sd, errorSDs, txid);
FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();

View File

@ -556,8 +556,13 @@ class FSImageFormat {
DataOutputStream out = new DataOutputStream(fos);
try {
out.writeInt(HdfsConstants.LAYOUT_VERSION);
out.writeInt(sourceNamesystem.getFSImage()
.getStorage().getNamespaceID()); // TODO bad dependency
// We use the non-locked version of getNamespaceInfo here since
// the coordinating thread of saveNamespace already has read-locked
// the namespace for us. If we attempt to take another readlock
// from the actual saver thread, there's a potential of a
// fairness-related deadlock. See the comments on HDFS-2223.
out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
.getNamespaceID());
out.writeLong(fsDir.rootDir.numItemsInTree());
out.writeLong(sourceNamesystem.getGenerationStamp());
out.writeLong(txid);

View File

@ -134,6 +134,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
/***************************************************
* FSNamesystem does the actual bookkeeping work for the
* DataNode.
@ -258,12 +260,43 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// lock to protect FSNamesystem.
private ReentrantReadWriteLock fsLock;
/**
* FSNamesystem constructor.
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
*
* @param conf the Configuration which specifies the storage directories
* from which to load
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
FSNamesystem(Configuration conf) throws IOException {
public static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
FSImage fsImage = new FSImage(conf);
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
long loadStart = now();
StartupOption startOpt = NameNode.getStartupOption(conf);
namesystem.loadFSImage(startOpt, fsImage);
long timeTakenToLoadFSImage = now() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().setFsImageLoadTime(
(int) timeTakenToLoadFSImage);
return namesystem;
}
/**
* Create an FSNamesystem associated with the specified image.
*
* Note that this does not load any data off of disk -- if you would
* like that behavior, use {@link #loadFromDisk(Configuration)}
* @param fnImage The FSImage to associate with
* @param conf configuration
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
try {
initialize(conf, null);
initialize(conf, fsImage);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@ -279,29 +312,41 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
this.systemStart = now();
this.blockManager = new BlockManager(this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
this.registerMBean(); // register the MBean for the FSNamesystemState
if(fsImage == null) {
this.dir = new FSDirectory(this, conf);
StartupOption startOpt = NameNode.getStartupOption(conf);
this.dir.loadFSImage(startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().setFsImageLoadTime(
(int) timeTakenToLoadFSImage);
} else {
this.dir = new FSDirectory(fsImage, this, conf);
}
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
}
void loadFSImage(StartupOption startOpt, FSImage fsImage)
throws IOException {
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
startOpt = StartupOption.REGULAR;
}
boolean success = false;
try {
if (fsImage.recoverTransitionRead(startOpt, this)) {
fsImage.saveNamespace(this);
}
fsImage.openEditLog();
success = true;
} finally {
if (!success) {
fsImage.close();
}
}
dir.imageLoadComplete();
}
void activateSecretManager() throws IOException {
if (dtSecretManager != null) {
dtSecretManager.startThreads();
@ -312,8 +357,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Activate FSNamesystem daemons.
*/
void activate(Configuration conf) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
setBlockTotal();
blockManager.activate(conf);
@ -396,36 +446,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return hasReadLock() || hasWriteLock();
}
/**
* dirs is a list of directories where the filesystem directory state
* is stored
*/
FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
this.fsLock = new ReentrantReadWriteLock(true);
this.blockManager = new BlockManager(this, conf);
setConfigurationParameters(conf);
this.dir = new FSDirectory(fsImage, this, conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
}
/**
* Create FSNamesystem for {@link BackupNode}.
* Should do everything that would be done for the NameNode,
* except for loading the image.
*
* @param bnImage {@link BackupImage}
* @param conf configuration
* @throws IOException
*/
FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException {
try {
initialize(conf, bnImage);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
throw e;
}
}
/**
* Initializes some of the members from configuration
@ -475,15 +495,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NamespaceInfo getNamespaceInfo() {
readLock();
try {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(),
dir.fsImage.getStorage().getCTime(),
upgradeManager.getUpgradeVersion());
return unprotectedGetNamespaceInfo();
} finally {
readUnlock();
}
}
/**
* Version of {@see #getNamespaceInfo()} that is not protected by a lock.
*/
NamespaceInfo unprotectedGetNamespaceInfo() {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(),
dir.fsImage.getStorage().getCTime(),
upgradeManager.getUpgradeVersion());
}
/**
* Close down this file system manager.
* Causes heartbeat and lease daemons to stop; waits briefly for
@ -2537,6 +2564,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException
*/
private void checkAvailableResources() throws IOException {
Preconditions.checkState(nnResourceChecker != null,
"nnResourceChecker not initialized");
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}
@ -2697,7 +2726,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException("Safe mode should be turned ON " +
"in order to create namespace image.");
}
getFSImage().saveNamespace();
getFSImage().saveNamespace(this);
LOG.info("New namespace image has been created.");
} finally {
readUnlock();

View File

@ -289,7 +289,7 @@ public class NameNode {
}
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = new FSNamesystem(conf);
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
NamenodeRegistration getRegistration() {
@ -592,16 +592,16 @@ public class NameNode {
}
System.out.println("Formatting using clusterid: " + clusterId);
FSImage fsImage = new FSImage(conf, null, dirsToFormat, editDirsToFormat);
FSNamesystem nsys = new FSNamesystem(fsImage, conf);
nsys.dir.fsImage.format(clusterId);
FSImage fsImage = new FSImage(conf, dirsToFormat, editDirsToFormat);
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsImage.format(fsn, clusterId);
return false;
}
private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded
) throws IOException {
FSNamesystem nsys = new FSNamesystem(new FSImage(conf), conf);
FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
System.err.print(
"\"finalize\" will remove the previous state of the files system.\n"
+ "Recent upgrade will become permanent.\n"

View File

@ -122,6 +122,8 @@ public class SecondaryNameNode implements Runnable {
/** checkpoint once every this many transactions, regardless of time */
private long checkpointTxnCount;
private FSNamesystem namesystem;
/** {@inheritDoc} */
public String toString() {
@ -221,6 +223,8 @@ public class SecondaryNameNode implements Runnable {
checkpointImage = new CheckpointStorage(conf, checkpointDirs, checkpointEditsDirs);
checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
namesystem = new FSNamesystem(conf, checkpointImage);
// Initialize other scheduling parameters from the configuration
checkpointCheckPeriod = conf.getLong(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@ -520,7 +524,7 @@ public class SecondaryNameNode implements Runnable {
boolean loadImage = downloadCheckpointFiles(
fsName, checkpointImage, sig, manifest); // Fetch fsimage and edits
doMerge(sig, manifest, loadImage, checkpointImage);
doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
//
// Upload the new image into the NameNode. Then tell the Namenode
@ -750,8 +754,7 @@ public class SecondaryNameNode implements Runnable {
CheckpointStorage(Configuration conf,
Collection<URI> imageDirs,
Collection<URI> editsDirs) throws IOException {
super(conf, (FSNamesystem)null, imageDirs, editsDirs);
setFSNamesystem(new FSNamesystem(this, conf));
super(conf, imageDirs, editsDirs);
// the 2NN never writes edits -- it only downloads them. So
// we shouldn't have any editLog instance. Setting to null
@ -837,7 +840,8 @@ public class SecondaryNameNode implements Runnable {
static void doMerge(
CheckpointSignature sig, RemoteEditLogManifest manifest,
boolean loadImage, FSImage dstImage) throws IOException {
boolean loadImage, FSImage dstImage, FSNamesystem dstNamesystem)
throws IOException {
NNStorage dstStorage = dstImage.getStorage();
dstStorage.setStorageInfo(sig);
@ -848,11 +852,11 @@ public class SecondaryNameNode implements Runnable {
sig.mostRecentCheckpointTxId + " even though it should have " +
"just been downloaded");
}
dstImage.reloadFromImageFile(file);
dstImage.reloadFromImageFile(file, dstNamesystem);
}
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage);
dstImage.saveFSImageInAllDirs(dstImage.getLastAppliedTxId());
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
dstStorage.writeAll();
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.UpgradeObjectNamenode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.test.GenericTestUtils;
/**
*/
@ -60,7 +61,8 @@ public class TestDistributedUpgrade extends TestCase {
* Attempts to start a NameNode with the given operation. Starting
* the NameNode should throw an exception.
*/
void startNameNodeShouldFail(StartupOption operation) {
void startNameNodeShouldFail(StartupOption operation,
String exceptionSubstring) {
try {
//cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).startupOption(operation).build(); // should fail
// we set manage dirs to true as NN has to start from untar'ed image with
@ -72,8 +74,8 @@ public class TestDistributedUpgrade extends TestCase {
.build(); // should fail
throw new AssertionError("NameNode should have failed to start");
} catch (Exception expected) {
expected = null;
// expected
GenericTestUtils.assertExceptionContains(
exceptionSubstring, expected);
}
}
@ -115,7 +117,7 @@ public class TestDistributedUpgrade extends TestCase {
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
log("NameNode start in regular mode when dustributed upgrade is required", numDirs);
startNameNodeShouldFail(StartupOption.REGULAR);
startNameNodeShouldFail(StartupOption.REGULAR, "contains an old layout version");
log("Start NameNode only distributed upgrade", numDirs);
// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false)
@ -128,10 +130,12 @@ public class TestDistributedUpgrade extends TestCase {
cluster.shutdown();
log("NameNode start in regular mode when dustributed upgrade has been started", numDirs);
startNameNodeShouldFail(StartupOption.REGULAR);
startNameNodeShouldFail(StartupOption.REGULAR,
"Previous distributed upgrade was not completed");
log("NameNode rollback to the old version that require a dustributed upgrade", numDirs);
startNameNodeShouldFail(StartupOption.ROLLBACK);
startNameNodeShouldFail(StartupOption.ROLLBACK,
"Cannot rollback to storage version -7 using this version");
log("Normal distributed upgrade for the cluster", numDirs);
cluster = new MiniDFSCluster.Builder(conf)

View File

@ -1216,7 +1216,7 @@ public class TestCheckpoint extends TestCase {
CheckpointStorage spyImage1 = spyOnSecondaryImage(secondary1);
DelayAnswer delayer = new DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(spyImage1)
.saveFSImageInAllDirs(Mockito.anyLong());
.saveFSImageInAllDirs(Mockito.<FSNamesystem>any(), Mockito.anyLong());
// Set up a thread to do a checkpoint from the first 2NN
DoCheckpointThread checkpointThread = new DoCheckpointThread(secondary1);

View File

@ -50,7 +50,7 @@ public class TestClusterId {
// see if cluster id not empty.
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(config);
Collection<URI> editsToFormat = new ArrayList<URI>(0);
FSImage fsImage = new FSImage(config, null, dirsToFormat, editsToFormat);
FSImage fsImage = new FSImage(config, dirsToFormat, editsToFormat);
Iterator<StorageDirectory> sdit =
fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);

View File

@ -350,7 +350,7 @@ public class TestEditLogRace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
final FSNamesystem namesystem = new FSNamesystem(conf);
final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
try {
FSImage fsimage = namesystem.getFSImage();
@ -448,7 +448,7 @@ public class TestEditLogRace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
final FSNamesystem namesystem = new FSNamesystem(conf);
final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
try {
FSImage fsimage = namesystem.getFSImage();

View File

@ -79,7 +79,7 @@ public class TestSaveNamespace {
public Void answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
StorageDirectory sd = (StorageDirectory)args[0];
StorageDirectory sd = (StorageDirectory)args[1];
if (count++ == 1) {
LOG.info("Injecting fault for sd: " + sd);
@ -106,7 +106,7 @@ public class TestSaveNamespace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = new FSNamesystem(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
// Replace the FSImage with a spy
FSImage originalImage = fsn.dir.fsImage;
@ -124,19 +124,22 @@ public class TestSaveNamespace {
case SAVE_SECOND_FSIMAGE_RTE:
// The spy throws a RuntimeException when writing to the second directory
doAnswer(new FaultySaveImage(true)).
when(spyImage).saveFSImage((StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
shouldFail = false;
break;
case SAVE_SECOND_FSIMAGE_IOE:
// The spy throws an IOException when writing to the second directory
doAnswer(new FaultySaveImage(false)).
when(spyImage).saveFSImage((StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
shouldFail = false;
break;
case SAVE_ALL_FSIMAGES:
// The spy throws IOException in all directories
doThrow(new RuntimeException("Injected")).
when(spyImage).saveFSImage((StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
shouldFail = true;
break;
case WRITE_STORAGE_ALL:
@ -184,7 +187,7 @@ public class TestSaveNamespace {
// Start a new namesystem, which should be able to recover
// the namespace from the previous incarnation.
fsn = new FSNamesystem(conf);
fsn = FSNamesystem.loadFromDisk(conf);
// Make sure the image loaded including our edits.
checkEditExists(fsn, 1);
@ -209,7 +212,7 @@ public class TestSaveNamespace {
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = new FSNamesystem(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
// Replace the FSImage with a spy
FSImage originalImage = fsn.dir.fsImage;
@ -263,7 +266,7 @@ public class TestSaveNamespace {
// Start a new namesystem, which should be able to recover
// the namespace from the previous incarnation.
LOG.info("Loading new FSmage from disk.");
fsn = new FSNamesystem(conf);
fsn = FSNamesystem.loadFromDisk(conf);
// Make sure the image loaded including our edit.
LOG.info("Checking reloaded image.");
@ -344,7 +347,7 @@ public class TestSaveNamespace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = new FSNamesystem(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
// Replace the FSImage with a spy
final FSImage originalImage = fsn.dir.fsImage;
@ -360,8 +363,9 @@ public class TestSaveNamespace {
FSNamesystem.getNamespaceEditsDirs(conf));
doThrow(new IOException("Injected fault: saveFSImage")).
when(spyImage).saveFSImage((StorageDirectory)anyObject(),
Mockito.anyLong());
when(spyImage).saveFSImage(
Mockito.eq(fsn), (StorageDirectory)anyObject(),
Mockito.anyLong());
try {
doAnEdit(fsn, 1);
@ -390,7 +394,7 @@ public class TestSaveNamespace {
// Start a new namesystem, which should be able to recover
// the namespace from the previous incarnation.
fsn = new FSNamesystem(conf);
fsn = FSNamesystem.loadFromDisk(conf);
// Make sure the image loaded including our edits.
checkEditExists(fsn, 1);
@ -406,7 +410,7 @@ public class TestSaveNamespace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = new FSNamesystem(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
try {
doAnEdit(fsn, 1);
@ -425,7 +429,7 @@ public class TestSaveNamespace {
// Start a new namesystem, which should be able to recover
// the namespace from the previous incarnation.
fsn = new FSNamesystem(conf);
fsn = FSNamesystem.loadFromDisk(conf);
// Make sure the image loaded including our edits.
checkEditExists(fsn, 1);
@ -442,7 +446,7 @@ public class TestSaveNamespace {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = new FSNamesystem(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
try {
// We have a BEGIN_LOG_SEGMENT txn to start
@ -464,7 +468,7 @@ public class TestSaveNamespace {
assertEquals(5, fsn.getEditLog().getLastWrittenTxId());
fsn = null;
fsn = new FSNamesystem(conf);
fsn = FSNamesystem.loadFromDisk(conf);
// 1 more txn to start new segment on restart
assertEquals(6, fsn.getEditLog().getLastWrittenTxId());

View File

@ -84,7 +84,7 @@ public class TestNNLeaseRecovery {
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
DFSTestUtil.formatNameNode(conf);
fsn = spy(new FSNamesystem(conf));
fsn = spy(FSNamesystem.loadFromDisk(conf));
}
/**
@ -428,7 +428,6 @@ public class TestNNLeaseRecovery {
when(fsn.getFSImage()).thenReturn(fsImage);
when(fsn.getFSImage().getEditLog()).thenReturn(editLog);
fsn.getFSImage().setFSNamesystem(fsn);
switch (fileBlocksNumber) {
case 0: