HDFS-10425. Clean up NNStorage and TestSaveNamespace. Contributed by Andras Bokor.
This commit is contained in:
parent
1c9d2ab503
commit
38128baff4
|
@ -69,9 +69,9 @@ public class NNStorage extends Storage implements Closeable,
|
|||
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
|
||||
static final String LOCAL_URI_SCHEME = "file";
|
||||
|
||||
//
|
||||
// The filenames used for storing the images
|
||||
//
|
||||
/**
|
||||
* The filenames used for storing the images.
|
||||
*/
|
||||
public enum NameNodeFile {
|
||||
IMAGE ("fsimage"),
|
||||
TIME ("fstime"), // from "old" pre-HDFS-1073 format
|
||||
|
@ -85,9 +85,14 @@ public enum NameNodeFile {
|
|||
IMAGE_LEGACY_OIV ("fsimage_legacy_oiv"); // For pre-PB format
|
||||
|
||||
private String fileName = null;
|
||||
private NameNodeFile(String name) { this.fileName = name; }
|
||||
NameNodeFile(String name) {
|
||||
this.fileName = name;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getName() { return fileName; }
|
||||
public String getName() {
|
||||
return fileName;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +102,7 @@ public enum NameNodeFile {
|
|||
* stores both fsimage and edits.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static enum NameNodeDirType implements StorageDirType {
|
||||
public enum NameNodeDirType implements StorageDirType {
|
||||
UNDEFINED,
|
||||
IMAGE,
|
||||
EDITS,
|
||||
|
@ -110,16 +115,15 @@ public StorageDirType getStorageDirType() {
|
|||
|
||||
@Override
|
||||
public boolean isOfType(StorageDirType type) {
|
||||
if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
|
||||
return true;
|
||||
return this == type;
|
||||
return (this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS) ||
|
||||
this == type;
|
||||
}
|
||||
}
|
||||
|
||||
protected String blockpoolID = ""; // id of the block pool
|
||||
|
||||
/**
|
||||
* flag that controls if we try to restore failed storages
|
||||
* Flag that controls if we try to restore failed storages.
|
||||
*/
|
||||
private boolean restoreFailedStorage = false;
|
||||
private final Object restorationLock = new Object();
|
||||
|
@ -131,7 +135,8 @@ public boolean isOfType(StorageDirType type) {
|
|||
* recent fsimage file. This does not include any transactions
|
||||
* that have since been written to the edit log.
|
||||
*/
|
||||
protected volatile long mostRecentCheckpointTxId = HdfsServerConstants.INVALID_TXID;
|
||||
protected volatile long mostRecentCheckpointTxId =
|
||||
HdfsServerConstants.INVALID_TXID;
|
||||
|
||||
/**
|
||||
* Time of the last checkpoint, in milliseconds since the epoch.
|
||||
|
@ -139,10 +144,10 @@ public boolean isOfType(StorageDirType type) {
|
|||
private long mostRecentCheckpointTime = 0;
|
||||
|
||||
/**
|
||||
* list of failed (and thus removed) storages
|
||||
* List of failed (and thus removed) storages.
|
||||
*/
|
||||
final protected List<StorageDirectory> removedStorageDirs
|
||||
= new CopyOnWriteArrayList<StorageDirectory>();
|
||||
= new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
* Properties from old layout versions that may be needed
|
||||
|
@ -167,7 +172,7 @@ public NNStorage(Configuration conf,
|
|||
throws IOException {
|
||||
super(NodeType.NAME_NODE);
|
||||
|
||||
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
|
||||
storageDirs = new CopyOnWriteArrayList<>();
|
||||
|
||||
// this may modify the editsDirs, so copy before passing in
|
||||
setStorageDirectories(imageDirs,
|
||||
|
@ -195,8 +200,9 @@ public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
|
|||
int oldVersion = oldFile.readInt();
|
||||
oldFile.close();
|
||||
oldFile = null;
|
||||
if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
|
||||
if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, oldFile);
|
||||
}
|
||||
|
@ -233,21 +239,19 @@ boolean getRestoreFailedStorage() {
|
|||
*/
|
||||
void attemptRestoreRemovedStorage() {
|
||||
// if directory is "alive" - copy the images there...
|
||||
if(!restoreFailedStorage || removedStorageDirs.size() == 0)
|
||||
if(!restoreFailedStorage || removedStorageDirs.size() == 0) {
|
||||
return; //nothing to restore
|
||||
|
||||
}
|
||||
/* We don't want more than one thread trying to restore at a time */
|
||||
synchronized (this.restorationLock) {
|
||||
LOG.info("NNStorage.attemptRestoreRemovedStorage: check removed(failed) "+
|
||||
"storarge. removedStorages size = " + removedStorageDirs.size());
|
||||
for(Iterator<StorageDirectory> it
|
||||
= this.removedStorageDirs.iterator(); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
for (StorageDirectory sd : this.removedStorageDirs) {
|
||||
File root = sd.getRoot();
|
||||
LOG.info("currently disabled dir " + root.getAbsolutePath() +
|
||||
"; type="+sd.getStorageDirType()
|
||||
+ ";canwrite="+FileUtil.canWrite(root));
|
||||
if(root.exists() && FileUtil.canWrite(root)) {
|
||||
"; type=" + sd.getStorageDirType()
|
||||
+ ";canwrite=" + FileUtil.canWrite(root));
|
||||
if (root.exists() && FileUtil.canWrite(root)) {
|
||||
LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
|
||||
this.addStorageDir(sd); // restore
|
||||
this.removedStorageDirs.remove(sd);
|
||||
|
@ -264,13 +268,13 @@ List<StorageDirectory> getRemovedStorageDirs() {
|
|||
}
|
||||
|
||||
/**
|
||||
* See {@link NNStorage#setStorageDirectories(Collection, Collection, Collection)}
|
||||
* See {@link NNStorage#setStorageDirectories(Collection, Collection, Collection)}.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
|
||||
Collection<URI> fsEditsDirs)
|
||||
throws IOException {
|
||||
setStorageDirectories(fsNameDirs, fsEditsDirs, new ArrayList<URI>());
|
||||
setStorageDirectories(fsNameDirs, fsEditsDirs, new ArrayList<>());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -322,14 +326,15 @@ synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
|
|||
checkSchemeConsistency(dirName);
|
||||
// Add to the list of storage directories, only if the
|
||||
// URI is of type file://
|
||||
if(dirName.getScheme().compareTo("file") == 0)
|
||||
if(dirName.getScheme().compareTo("file") == 0) {
|
||||
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
||||
NameNodeDirType.EDITS, sharedEditsDirs.contains(dirName)));
|
||||
NameNodeDirType.EDITS, sharedEditsDirs.contains(dirName)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the storage directory corresponding to the passed URI
|
||||
* Return the storage directory corresponding to the passed URI.
|
||||
* @param uri URI of a storage directory
|
||||
* @return The matching storage directory or null if none found
|
||||
*/
|
||||
|
@ -337,7 +342,7 @@ StorageDirectory getStorageDirectory(URI uri) {
|
|||
try {
|
||||
uri = Util.fileAsURI(new File(uri));
|
||||
Iterator<StorageDirectory> it = dirIterator();
|
||||
for (; it.hasNext(); ) {
|
||||
while (it.hasNext()) {
|
||||
StorageDirectory sd = it.next();
|
||||
if (Util.fileAsURI(sd.getRoot()).equals(uri)) {
|
||||
return sd;
|
||||
|
@ -351,7 +356,7 @@ StorageDirectory getStorageDirectory(URI uri) {
|
|||
|
||||
/**
|
||||
* Checks the consistency of a URI, in particular if the scheme
|
||||
* is specified
|
||||
* is specified.
|
||||
* @param u URI whose consistency is being checked.
|
||||
*/
|
||||
private static void checkSchemeConsistency(URI u) throws IOException {
|
||||
|
@ -363,7 +368,7 @@ private static void checkSchemeConsistency(URI u) throws IOException {
|
|||
}
|
||||
|
||||
/**
|
||||
* Retrieve current directories of type IMAGE
|
||||
* Retrieve current directories of type IMAGE.
|
||||
* @return Collection of URI representing image directories
|
||||
* @throws IOException in case of URI processing error
|
||||
*/
|
||||
|
@ -372,7 +377,7 @@ Collection<URI> getImageDirectories() throws IOException {
|
|||
}
|
||||
|
||||
/**
|
||||
* Retrieve current directories of type EDITS
|
||||
* Retrieve current directories of type EDITS.
|
||||
* @return Collection of URI representing edits directories
|
||||
* @throws IOException in case of URI processing error
|
||||
*/
|
||||
|
@ -386,12 +391,14 @@ Collection<URI> getEditsDirectories() throws IOException {
|
|||
* @return number of storage directories of type dirType
|
||||
*/
|
||||
int getNumStorageDirs(NameNodeDirType dirType) {
|
||||
if(dirType == null)
|
||||
if(dirType == null) {
|
||||
return getNumStorageDirs();
|
||||
}
|
||||
Iterator<StorageDirectory> it = dirIterator(dirType);
|
||||
int numDirs = 0;
|
||||
for(; it.hasNext(); it.next())
|
||||
for(; it.hasNext(); it.next()) {
|
||||
numDirs++;
|
||||
}
|
||||
return numDirs;
|
||||
}
|
||||
|
||||
|
@ -404,10 +411,10 @@ int getNumStorageDirs(NameNodeDirType dirType) {
|
|||
*/
|
||||
Collection<URI> getDirectories(NameNodeDirType dirType)
|
||||
throws IOException {
|
||||
ArrayList<URI> list = new ArrayList<URI>();
|
||||
ArrayList<URI> list = new ArrayList<>();
|
||||
Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
|
||||
dirIterator(dirType);
|
||||
for ( ;it.hasNext(); ) {
|
||||
for ( ; it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
try {
|
||||
list.add(Util.fileAsURI(sd.getRoot()));
|
||||
|
@ -440,7 +447,8 @@ static long readTransactionIdFile(StorageDirectory sd) throws IOException {
|
|||
* @param sd storage directory
|
||||
* @throws IOException
|
||||
*/
|
||||
void writeTransactionIdFile(StorageDirectory sd, long txid) throws IOException {
|
||||
void writeTransactionIdFile(StorageDirectory sd, long txid)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(txid >= 0, "bad txid: " + txid);
|
||||
|
||||
File txIdFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
|
||||
|
@ -448,7 +456,7 @@ void writeTransactionIdFile(StorageDirectory sd, long txid) throws IOException {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the transaction ID and time of the last checkpoint
|
||||
* Set the transaction ID and time of the last checkpoint.
|
||||
*
|
||||
* @param txid transaction id of the last checkpoint
|
||||
* @param time time of the last checkpoint, in millis since the epoch
|
||||
|
@ -513,12 +521,12 @@ public void writeTransactionIdFileToStorage(long txid,
|
|||
|
||||
/**
|
||||
* Return the name of the image file that is uploaded by periodic
|
||||
* checkpointing
|
||||
* checkpointing.
|
||||
*
|
||||
* @return List of filenames to save checkpoints to.
|
||||
*/
|
||||
public File[] getFsImageNameCheckpoint(long txid) {
|
||||
ArrayList<File> list = new ArrayList<File>();
|
||||
ArrayList<File> list = new ArrayList<>();
|
||||
for (Iterator<StorageDirectory> it =
|
||||
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
||||
list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW, txid));
|
||||
|
@ -583,7 +591,8 @@ private void format(StorageDirectory sd) throws IOException {
|
|||
*/
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
Preconditions.checkArgument(nsInfo.getLayoutVersion() == 0 ||
|
||||
nsInfo.getLayoutVersion() == HdfsServerConstants.NAMENODE_LAYOUT_VERSION,
|
||||
nsInfo.getLayoutVersion() ==
|
||||
HdfsServerConstants.NAMENODE_LAYOUT_VERSION,
|
||||
"Bad layout version: %s", nsInfo.getLayoutVersion());
|
||||
|
||||
this.setStorageInfo(nsInfo);
|
||||
|
@ -624,8 +633,9 @@ public void format() throws IOException {
|
|||
*/
|
||||
private static int newNamespaceID() {
|
||||
int newID = 0;
|
||||
while(newID == 0)
|
||||
while(newID == 0) {
|
||||
newID = ThreadLocalRandom.current().nextInt(0x7FFFFFFF); // use 31 bits
|
||||
}
|
||||
return newID;
|
||||
}
|
||||
|
||||
|
@ -650,8 +660,8 @@ LayoutVersion.Feature.FEDERATION, getLayoutVersion())) {
|
|||
void readProperties(StorageDirectory sd, StartupOption startupOption)
|
||||
throws IOException {
|
||||
Properties props = readPropertiesFile(sd.getVersionFile());
|
||||
if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK.matches
|
||||
(startupOption)) {
|
||||
if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK
|
||||
.matches(startupOption)) {
|
||||
int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
|
||||
if (lv > getServiceLayoutVersion()) {
|
||||
// we should not use a newer version for rollingUpgrade rollback
|
||||
|
@ -669,7 +679,7 @@ void readProperties(StorageDirectory sd, StartupOption startupOption)
|
|||
* versions of HDFS and only necessary during upgrade.
|
||||
*/
|
||||
private void setDeprecatedPropertiesForUpgrade(Properties props) {
|
||||
deprecatedProperties = new HashMap<String, String>();
|
||||
deprecatedProperties = new HashMap<>();
|
||||
String md5 = props.getProperty(DEPRECATED_MESSAGE_DIGEST_PROPERTY);
|
||||
if (md5 != null) {
|
||||
deprecatedProperties.put(DEPRECATED_MESSAGE_DIGEST_PROPERTY, md5);
|
||||
|
@ -700,8 +710,7 @@ assert getLayoutVersion() > HdfsServerConstants.NAMENODE_LAYOUT_VERSION :
|
|||
*/
|
||||
@Override // Storage
|
||||
protected void setPropertiesFromFields(Properties props,
|
||||
StorageDirectory sd
|
||||
) throws IOException {
|
||||
StorageDirectory sd) throws IOException {
|
||||
super.setPropertiesFromFields(props, sd);
|
||||
// Set blockpoolID in version with federation support
|
||||
if (NameNodeLayoutVersion.supports(
|
||||
|
@ -710,14 +719,15 @@ LayoutVersion.Feature.FEDERATION, getLayoutVersion())) {
|
|||
}
|
||||
}
|
||||
|
||||
static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
|
||||
static File getStorageFile(StorageDirectory sd, NameNodeFile type,
|
||||
long imageTxId) {
|
||||
return new File(sd.getCurrentDir(),
|
||||
String.format("%s_%019d", type.getName(), imageTxId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a storage file for one of the files that doesn't need a txid associated
|
||||
* (e.g version, seen_txid)
|
||||
* (e.g version, seen_txid).
|
||||
*/
|
||||
static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
|
||||
return new File(sd.getCurrentDir(), type.getName());
|
||||
|
@ -779,8 +789,8 @@ public static String getFinalizedEditsFileName(long startTxId, long endTxId) {
|
|||
|
||||
public static String getTemporaryEditsFileName(long startTxId, long endTxId,
|
||||
long timestamp) {
|
||||
return String.format("%s_%019d-%019d_%019d", NameNodeFile.EDITS_TMP.getName(),
|
||||
startTxId, endTxId, timestamp);
|
||||
return String.format("%s_%019d-%019d_%019d",
|
||||
NameNodeFile.EDITS_TMP.getName(), startTxId, endTxId, timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -799,7 +809,7 @@ File findFinalizedEditsFile(long startTxId, long endTxId)
|
|||
|
||||
/**
|
||||
* Return the first readable image file for the given txid and image type, or
|
||||
* null if no such image can be found
|
||||
* null if no such image can be found.
|
||||
*/
|
||||
File findImageFile(NameNodeFile nnf, long txid) {
|
||||
return findFile(NameNodeDirType.IMAGE,
|
||||
|
@ -878,9 +888,10 @@ private void reportErrorsOnDirectory(StorageDirectory sd) {
|
|||
* @param layoutVersion Layout version for the upgrade
|
||||
* @throws IOException
|
||||
*/
|
||||
void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion)
|
||||
throws IOException {
|
||||
if (startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) {
|
||||
void processStartupOptionsForUpgrade(StartupOption startOpt,
|
||||
int layoutVersion) throws IOException {
|
||||
if (startOpt == StartupOption.UPGRADE ||
|
||||
startOpt == StartupOption.UPGRADEONLY) {
|
||||
// If upgrade from a release that does not support federation,
|
||||
// if clusterId is provided in the startupOptions use it.
|
||||
// Else generate a new cluster ID
|
||||
|
@ -935,10 +946,10 @@ public void reportErrorOnFile(File f) {
|
|||
*
|
||||
* clusterID is a persistent attribute of the cluster.
|
||||
* It is generated when the cluster is created and remains the same
|
||||
* during the life cycle of the cluster. When a new name node is formated, if
|
||||
* this is a new cluster, a new clusterID is geneated and stored. Subsequent
|
||||
* name node must be given the same ClusterID during its format to be in the
|
||||
* same cluster.
|
||||
* during the life cycle of the cluster. When a new name node is formated,
|
||||
* if this is a new cluster, a new clusterID is geneated and stored.
|
||||
* Subsequent name node must be given the same ClusterID during its format to
|
||||
* be in the same cluster.
|
||||
* When a datanode register it receive the clusterID and stick with it.
|
||||
* If at any point, name node or data node tries to join another cluster, it
|
||||
* will be rejected.
|
||||
|
@ -954,13 +965,13 @@ void setClusterID(String cid) {
|
|||
}
|
||||
|
||||
/**
|
||||
* try to find current cluster id in the VERSION files
|
||||
* Try to find current cluster id in the VERSION files.
|
||||
* returns first cluster id found in any VERSION file
|
||||
* null in case none found
|
||||
* @return clusterId or null in case no cluster id found
|
||||
*/
|
||||
public String determineClusterId() {
|
||||
String cid = null;
|
||||
String cid;
|
||||
Iterator<StorageDirectory> sdit = dirIterator(NameNodeDirType.IMAGE);
|
||||
while(sdit.hasNext()) {
|
||||
StorageDirectory sd = sdit.next();
|
||||
|
@ -970,8 +981,9 @@ public String determineClusterId() {
|
|||
LOG.info("current cluster id for sd="+sd.getCurrentDir() +
|
||||
";lv=" + layoutVersion + ";cid=" + cid);
|
||||
|
||||
if(cid != null && !cid.equals(""))
|
||||
if(cid != null && !cid.equals("")) {
|
||||
return cid;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("this sd not available: " + e.getLocalizedMessage());
|
||||
} //ignore
|
||||
|
@ -986,7 +998,7 @@ public String determineClusterId() {
|
|||
* @return new blockpoolID
|
||||
*/
|
||||
static String newBlockPoolID() throws UnknownHostException{
|
||||
String ip = "unknownIP";
|
||||
String ip;
|
||||
try {
|
||||
ip = DNS.getDefaultIP("default");
|
||||
} catch (UnknownHostException e) {
|
||||
|
@ -995,16 +1007,15 @@ static String newBlockPoolID() throws UnknownHostException{
|
|||
}
|
||||
|
||||
int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
|
||||
String bpid = "BP-" + rand + "-"+ ip + "-" + Time.now();
|
||||
return bpid;
|
||||
return "BP-" + rand + "-"+ ip + "-" + Time.now();
|
||||
}
|
||||
|
||||
/** Validate and set block pool ID */
|
||||
/** Validate and set block pool ID. */
|
||||
public void setBlockPoolID(String bpid) {
|
||||
blockpoolID = bpid;
|
||||
}
|
||||
|
||||
/** Validate and set block pool ID */
|
||||
/** Validate and set block pool ID. */
|
||||
private void setBlockPoolID(File storage, String bpid)
|
||||
throws InconsistentFSStateException {
|
||||
if (bpid == null || bpid.equals("")) {
|
||||
|
@ -1044,7 +1055,8 @@ void inspectStorageDirs(FSImageStorageInspector inspector)
|
|||
* inspected each directory.
|
||||
*
|
||||
* <b>Note:</b> this can mutate the storage info fields (ctime, version, etc).
|
||||
* @throws IOException if no valid storage dirs are found or no valid layout version
|
||||
* @throws IOException if no valid storage dirs are found or no valid layout
|
||||
* version
|
||||
*/
|
||||
FSImageStorageInspector readAndInspectDirs(EnumSet<NameNodeFile> fileTypes,
|
||||
StartupOption startupOption) throws IOException {
|
||||
|
@ -1057,21 +1069,24 @@ FSImageStorageInspector readAndInspectDirs(EnumSet<NameNodeFile> fileTypes,
|
|||
it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
if (!sd.getVersionFile().exists()) {
|
||||
FSImage.LOG.warn("Storage directory " + sd + " contains no VERSION file. Skipping...");
|
||||
FSImage.LOG.warn("Storage directory " + sd +
|
||||
" contains no VERSION file. Skipping...");
|
||||
continue;
|
||||
}
|
||||
readProperties(sd, startupOption); // sets layoutVersion
|
||||
int lv = getLayoutVersion();
|
||||
if (layoutVersion == null) {
|
||||
layoutVersion = Integer.valueOf(lv);
|
||||
layoutVersion = lv;
|
||||
} else if (!layoutVersion.equals(lv)) {
|
||||
multipleLV = true;
|
||||
}
|
||||
layoutVersions.append("(").append(sd.getRoot()).append(", ").append(lv).append(") ");
|
||||
layoutVersions.append("(").append(sd.getRoot()).append(", ").append(lv)
|
||||
.append(") ");
|
||||
}
|
||||
|
||||
if (layoutVersion == null) {
|
||||
throw new IOException("No storage directories contained VERSION information");
|
||||
throw new IOException("No storage directories contained VERSION" +
|
||||
" information");
|
||||
}
|
||||
if (multipleLV) {
|
||||
throw new IOException(
|
||||
|
|
|
@ -85,8 +85,8 @@ public class TestSaveNamespace {
|
|||
private static final Log LOG = LogFactory.getLog(TestSaveNamespace.class);
|
||||
|
||||
private static class FaultySaveImage implements Answer<Void> {
|
||||
int count = 0;
|
||||
boolean throwRTE = true;
|
||||
private int count = 0;
|
||||
private boolean throwRTE = true;
|
||||
|
||||
// generate either a RuntimeException or IOException
|
||||
public FaultySaveImage(boolean throwRTE) {
|
||||
|
@ -140,7 +140,7 @@ private enum Fault {
|
|||
SAVE_ALL_FSIMAGES,
|
||||
WRITE_STORAGE_ALL,
|
||||
WRITE_STORAGE_ONE
|
||||
};
|
||||
}
|
||||
|
||||
private void saveNamespaceWithInjectedFault(Fault fault) throws Exception {
|
||||
Configuration conf = getConf();
|
||||
|
@ -164,39 +164,41 @@ private void saveNamespaceWithInjectedFault(Fault fault) throws Exception {
|
|||
case SAVE_SECOND_FSIMAGE_RTE:
|
||||
// The spy throws a RuntimeException when writing to the second directory
|
||||
doAnswer(new FaultySaveImage(true)).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
when(spyImage).saveFSImage(
|
||||
anyObject(),
|
||||
anyObject(), anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_SECOND_FSIMAGE_IOE:
|
||||
// The spy throws an IOException when writing to the second directory
|
||||
doAnswer(new FaultySaveImage(false)).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
when(spyImage).saveFSImage(
|
||||
anyObject(),
|
||||
anyObject(), anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_ALL_FSIMAGES:
|
||||
// The spy throws IOException in all directories
|
||||
doThrow(new RuntimeException("Injected")).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
when(spyImage).saveFSImage(
|
||||
anyObject(),
|
||||
anyObject(), anyObject());
|
||||
shouldFail = true;
|
||||
break;
|
||||
case WRITE_STORAGE_ALL:
|
||||
// The spy throws an exception before writing any VERSION files
|
||||
doAnswer(new FaultyWriteProperties(Fault.WRITE_STORAGE_ALL))
|
||||
.when(spyStorage).writeProperties((StorageDirectory)anyObject());
|
||||
.when(spyStorage).writeProperties(anyObject());
|
||||
shouldFail = true;
|
||||
break;
|
||||
case WRITE_STORAGE_ONE:
|
||||
// The spy throws on exception on one particular storage directory
|
||||
doAnswer(new FaultyWriteProperties(Fault.WRITE_STORAGE_ONE))
|
||||
.when(spyStorage).writeProperties((StorageDirectory)anyObject());
|
||||
.when(spyStorage).writeProperties(anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
default: fail("Unknown fail type");
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -210,7 +212,7 @@ private void saveNamespaceWithInjectedFault(Fault fault) throws Exception {
|
|||
fail("Did not fail!");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (! shouldFail) {
|
||||
if (!shouldFail) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.info("Test caught expected exception", e);
|
||||
|
@ -392,21 +394,20 @@ public void doTestFailedSaveNamespace(boolean restoreStorageAfterFailure)
|
|||
// Replace the FSImage with a spy
|
||||
final FSImage originalImage = fsn.getFSImage();
|
||||
NNStorage storage = originalImage.getStorage();
|
||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||
// unlock any directories that
|
||||
// FSNamesystem's initialization may have locked
|
||||
storage.close();
|
||||
|
||||
NNStorage spyStorage = spy(storage);
|
||||
originalImage.storage = spyStorage;
|
||||
FSImage spyImage = spy(originalImage);
|
||||
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||
|
||||
spyImage.storage.setStorageDirectories(
|
||||
FSNamesystem.getNamespaceDirs(conf),
|
||||
spyImage.storage.setStorageDirectories(FSNamesystem.getNamespaceDirs(conf),
|
||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
|
||||
doThrow(new IOException("Injected fault: saveFSImage")).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
when(spyImage).saveFSImage(anyObject(), anyObject(), anyObject());
|
||||
|
||||
try {
|
||||
doAnEdit(fsn, 1);
|
||||
|
@ -530,7 +531,9 @@ public void testCancelSaveNamespace() throws Exception {
|
|||
// Replace the FSImage with a spy
|
||||
final FSImage image = fsn.getFSImage();
|
||||
NNStorage storage = image.getStorage();
|
||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||
// unlock any directories that
|
||||
// FSNamesystem's initialization may have locked
|
||||
storage.close();
|
||||
storage.setStorageDirectories(
|
||||
FSNamesystem.getNamespaceDirs(conf),
|
||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
|
@ -539,7 +542,8 @@ public void testCancelSaveNamespace() throws Exception {
|
|||
final FSNamesystem finalFsn = spyFsn;
|
||||
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
||||
BlockIdManager bid = spy(spyFsn.getBlockManager().getBlockIdManager());
|
||||
Whitebox.setInternalState(finalFsn.getBlockManager(), "blockIdManager", bid);
|
||||
Whitebox.setInternalState(finalFsn.getBlockManager(),
|
||||
"blockIdManager", bid);
|
||||
doAnswer(delayer).when(bid).getGenerationStamp();
|
||||
|
||||
ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
|
@ -572,8 +576,8 @@ public Void call() throws Exception {
|
|||
// give the cancel call time to run
|
||||
Thread.sleep(500);
|
||||
|
||||
// allow saveNamespace to proceed - it should check the cancel flag after
|
||||
// this point and throw an exception
|
||||
// allow saveNamespace to proceed - it should check the cancel flag
|
||||
// after this point and throw an exception
|
||||
delayer.proceed();
|
||||
|
||||
cancelFuture.get();
|
||||
|
@ -622,9 +626,7 @@ public void testSaveNamespaceWithRenamedLease() throws Exception {
|
|||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, out, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -641,9 +643,7 @@ public void testSaveNamespaceWithDanglingLease() throws Exception {
|
|||
cluster.getNameNodeRpc().saveNamespace(0, 0);
|
||||
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -697,11 +697,8 @@ public void testSaveNamespaceBeforeShutdown() throws Exception {
|
|||
|
||||
private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
|
||||
// Make an edit
|
||||
fsn.mkdirs(
|
||||
"/test" + id,
|
||||
new PermissionStatus("test", "Test",
|
||||
new FsPermission((short)0777)),
|
||||
true);
|
||||
fsn.mkdirs("/test" + id, new PermissionStatus("test", "Test",
|
||||
new FsPermission((short)0777)), true);
|
||||
}
|
||||
|
||||
private void checkEditExists(FSNamesystem fsn, int id) throws IOException {
|
||||
|
@ -719,7 +716,8 @@ private Configuration getConf() throws IOException {
|
|||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nameDirs);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, nameDirs);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
|
||||
"0.0.0.0:0");
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||
return conf;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue