HDFS-11714. Newly added NN storage directory won't get initialized and cause space exhaustion. Contributed by Kihwal Lee.

This commit is contained in:
Kihwal Lee 2017-05-01 17:29:25 -05:00
parent 343948ca79
commit 4cfc866436
2 changed files with 91 additions and 1 deletions

View File

@ -98,6 +98,16 @@ public class FSImage implements Closeable {
protected NNStorageRetentionManager archivalManager; protected NNStorageRetentionManager archivalManager;
/**
* The collection of newly added storage directories. These are partially
* formatted then later fully populated along with a VERSION file.
* For HA, the second part is done when the next checkpoint is saved.
* This set will be cleared once a VERSION file is created.
* For non-HA, a new fsimage will be locally generated along with a new
* VERSION file. This set is not used for non-HA mode.
*/
private Set<StorageDirectory> newDirs = null;
/* Used to make sure there are no concurrent checkpoints for a given txid /* Used to make sure there are no concurrent checkpoints for a given txid
* The checkpoint here could be one of the following operations. * The checkpoint here could be one of the following operations.
* a. checkpoint when NN is in standby. * a. checkpoint when NN is in standby.
@ -261,9 +271,26 @@ public class FSImage implements Closeable {
throw new IOException(StorageState.NON_EXISTENT + throw new IOException(StorageState.NON_EXISTENT +
" state cannot be here"); " state cannot be here");
case NOT_FORMATTED: case NOT_FORMATTED:
// Create a dir structure, but not the VERSION file. The presence of
// VERSION is checked in the inspector's needToSave() method and
// saveNamespace is triggered if it is absent. This will bring
// the storage state uptodate along with a new VERSION file.
// If HA is enabled, NNs start up as standby so saveNamespace is not
// triggered.
LOG.info("Storage directory " + sd.getRoot() + " is not formatted."); LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ..."); LOG.info("Formatting ...");
sd.clearDirectory(); // create empty currrent dir sd.clearDirectory(); // create empty currrent dir
// For non-HA, no further action is needed here, as saveNamespace will
// take care of the rest.
if (!target.isHaEnabled()) {
continue;
}
// If HA is enabled, save the dirs to create a version file later when
// a checkpoint image is saved.
if (newDirs == null) {
newDirs = new HashSet<StorageDirectory>();
}
newDirs.add(sd);
break; break;
default: default:
break; break;
@ -289,7 +316,27 @@ public class FSImage implements Closeable {
return loadFSImage(target, startOpt, recovery); return loadFSImage(target, startOpt, recovery);
} }
/**
* Create a VERSION file in the newly added storage directories.
*/
private void initNewDirs() {
if (newDirs == null) {
return;
}
for (StorageDirectory sd : newDirs) {
try {
storage.writeProperties(sd);
LOG.info("Wrote VERSION in the new storage, " + sd.getCurrentDir());
} catch (IOException e) {
// Failed to create a VERSION file. Report the error.
storage.reportErrorOnFile(sd.getVersionFile());
}
}
newDirs.clear();
newDirs = null;
}
/** /**
* For each storage directory, performs recovery of incomplete transitions * For each storage directory, performs recovery of incomplete transitions
* (eg. upgrade, rollback, checkpoint) and inserts the directory's storage * (eg. upgrade, rollback, checkpoint) and inserts the directory's storage
@ -1350,6 +1397,9 @@ public class FSImage implements Closeable {
if (txid > storage.getMostRecentCheckpointTxId()) { if (txid > storage.getMostRecentCheckpointTxId()) {
storage.setMostRecentCheckpointInfo(txid, Time.now()); storage.setMostRecentCheckpointInfo(txid, Time.now());
} }
// Create a version file in any new storage directory.
initNewDirs();
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.*; import org.apache.hadoop.hdfs.server.namenode.*;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.Canceler;
@ -41,6 +42,7 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -175,6 +177,44 @@ public class TestStandbyCheckpoints {
purgeLogsOlderThan(Mockito.anyLong()); purgeLogsOlderThan(Mockito.anyLong());
} }
@Test
public void testNewDirInitAfterCheckpointing() throws Exception {
File hdfsDir = new File(PathUtils.getTestDir(TestStandbyCheckpoints.class),
"testNewDirInitAfterCheckpointing");
File nameDir = new File(hdfsDir, "name1");
assert nameDir.mkdirs();
// Restart nn0 with an additional name dir.
String existingDir = cluster.getConfiguration(0).
get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
cluster.getConfiguration(0).set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
existingDir + "," + Util.fileAsURI(nameDir).toString());
cluster.restartNameNode(0);
nns[0] = cluster.getNameNode(0);
cluster.transitionToActive(0);
// "current" is created, but current/VERSION isn't.
File currDir = new File(nameDir, "current");
File versionFile = new File(currDir, "VERSION");
assert currDir.exists();
assert !versionFile.exists();
// Trigger a checkpointing and upload.
doEdits(0, 10);
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
// The version file will be created if a checkpoint is uploaded.
// Wait for it to happen up to 10 seconds.
for (int i = 0; i < 20; i++) {
if (versionFile.exists()) {
break;
}
Thread.sleep(500);
}
// VERSION must have been created.
assert versionFile.exists();
}
/** /**
* Test for the case when both of the NNs in the cluster are * Test for the case when both of the NNs in the cluster are
* in the standby state, and thus are both creating checkpoints * in the standby state, and thus are both creating checkpoints