HDFS-11714. Newly added NN storage directory won't get initialized and cause space exhaustion. Contributed by Kihwal Lee.
This commit is contained in:
parent
d1e6b6db7c
commit
c9bf21b0f3
|
@ -97,6 +97,16 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
protected NNStorageRetentionManager archivalManager;
|
protected NNStorageRetentionManager archivalManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The collection of newly added storage directories. These are partially
|
||||||
|
* formatted then later fully populated along with a VERSION file.
|
||||||
|
* For HA, the second part is done when the next checkpoint is saved.
|
||||||
|
* This set will be cleared once a VERSION file is created.
|
||||||
|
* For non-HA, a new fsimage will be locally generated along with a new
|
||||||
|
* VERSION file. This set is not used for non-HA mode.
|
||||||
|
*/
|
||||||
|
private Set<StorageDirectory> newDirs = null;
|
||||||
|
|
||||||
/* Used to make sure there are no concurrent checkpoints for a given txid
|
/* Used to make sure there are no concurrent checkpoints for a given txid
|
||||||
* The checkpoint here could be one of the following operations.
|
* The checkpoint here could be one of the following operations.
|
||||||
* a. checkpoint when NN is in standby.
|
* a. checkpoint when NN is in standby.
|
||||||
|
@ -260,9 +270,26 @@ public class FSImage implements Closeable {
|
||||||
throw new IOException(StorageState.NON_EXISTENT +
|
throw new IOException(StorageState.NON_EXISTENT +
|
||||||
" state cannot be here");
|
" state cannot be here");
|
||||||
case NOT_FORMATTED:
|
case NOT_FORMATTED:
|
||||||
|
// Create a dir structure, but not the VERSION file. The presence of
|
||||||
|
// VERSION is checked in the inspector's needToSave() method and
|
||||||
|
// saveNamespace is triggered if it is absent. This will bring
|
||||||
|
// the storage state uptodate along with a new VERSION file.
|
||||||
|
// If HA is enabled, NNs start up as standby so saveNamespace is not
|
||||||
|
// triggered.
|
||||||
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
|
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
|
||||||
LOG.info("Formatting ...");
|
LOG.info("Formatting ...");
|
||||||
sd.clearDirectory(); // create empty currrent dir
|
sd.clearDirectory(); // create empty currrent dir
|
||||||
|
// For non-HA, no further action is needed here, as saveNamespace will
|
||||||
|
// take care of the rest.
|
||||||
|
if (!target.isHaEnabled()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// If HA is enabled, save the dirs to create a version file later when
|
||||||
|
// a checkpoint image is saved.
|
||||||
|
if (newDirs == null) {
|
||||||
|
newDirs = new HashSet<StorageDirectory>();
|
||||||
|
}
|
||||||
|
newDirs.add(sd);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -289,6 +316,26 @@ public class FSImage implements Closeable {
|
||||||
return loadFSImage(target, startOpt, recovery);
|
return loadFSImage(target, startOpt, recovery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a VERSION file in the newly added storage directories.
|
||||||
|
*/
|
||||||
|
private void initNewDirs() {
|
||||||
|
if (newDirs == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (StorageDirectory sd : newDirs) {
|
||||||
|
try {
|
||||||
|
storage.writeProperties(sd);
|
||||||
|
LOG.info("Wrote VERSION in the new storage, " + sd.getCurrentDir());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Failed to create a VERSION file. Report the error.
|
||||||
|
storage.reportErrorOnFile(sd.getVersionFile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newDirs.clear();
|
||||||
|
newDirs = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For each storage directory, performs recovery of incomplete transitions
|
* For each storage directory, performs recovery of incomplete transitions
|
||||||
* (eg. upgrade, rollback, checkpoint) and inserts the directory's storage
|
* (eg. upgrade, rollback, checkpoint) and inserts the directory's storage
|
||||||
|
@ -1329,6 +1376,9 @@ public class FSImage implements Closeable {
|
||||||
if (txid > storage.getMostRecentCheckpointTxId()) {
|
if (txid > storage.getMostRecentCheckpointTxId()) {
|
||||||
storage.setMostRecentCheckpointInfo(txid, Time.now());
|
storage.setMostRecentCheckpointInfo(txid, Time.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a version file in any new storage directory.
|
||||||
|
initNewDirs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.*;
|
import org.apache.hadoop.hdfs.server.namenode.*;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||||
import org.apache.hadoop.hdfs.util.Canceler;
|
import org.apache.hadoop.hdfs.util.Canceler;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -168,6 +170,44 @@ public class TestStandbyCheckpoints {
|
||||||
purgeLogsOlderThan(Mockito.anyLong());
|
purgeLogsOlderThan(Mockito.anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewDirInitAfterCheckpointing() throws Exception {
|
||||||
|
File hdfsDir = new File(PathUtils.getTestDir(TestStandbyCheckpoints.class),
|
||||||
|
"testNewDirInitAfterCheckpointing");
|
||||||
|
File nameDir = new File(hdfsDir, "name1");
|
||||||
|
assert nameDir.mkdirs();
|
||||||
|
|
||||||
|
// Restart nn0 with an additional name dir.
|
||||||
|
String existingDir = cluster.getConfiguration(0).
|
||||||
|
get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
||||||
|
cluster.getConfiguration(0).set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||||
|
existingDir + "," + Util.fileAsURI(nameDir).toString());
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
nn0 = cluster.getNameNode(0);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
// "current" is created, but current/VERSION isn't.
|
||||||
|
File currDir = new File(nameDir, "current");
|
||||||
|
File versionFile = new File(currDir, "VERSION");
|
||||||
|
assert currDir.exists();
|
||||||
|
assert !versionFile.exists();
|
||||||
|
|
||||||
|
// Trigger a checkpointing and upload.
|
||||||
|
doEdits(0, 10);
|
||||||
|
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||||
|
|
||||||
|
// The version file will be created if a checkpoint is uploaded.
|
||||||
|
// Wait for it to happen up to 10 seconds.
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
if (versionFile.exists()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
// VERSION must have been created.
|
||||||
|
assert versionFile.exists();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for the case when both of the NNs in the cluster are
|
* Test for the case when both of the NNs in the cluster are
|
||||||
* in the standby state, and thus are both creating checkpoints
|
* in the standby state, and thus are both creating checkpoints
|
||||||
|
|
Loading…
Reference in New Issue