HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId file in the current directory. Contributed by Jing Zhao.
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
parent
e4d46e5ff6
commit
9ad9e51cd9
|
@ -622,6 +622,9 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
HDFS-7127. TestLeaseRecovery leaks MiniDFSCluster instances. (cnauroth)
|
||||
|
||||
HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
|
||||
file in the current directory. (jing9)
|
||||
|
||||
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
|
||||
|
||||
HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
|
||||
|
|
|
@ -996,7 +996,7 @@ public class Journal implements Closeable {
|
|||
public synchronized void doPreUpgrade() throws IOException {
|
||||
// Do not hold file lock on committedTxnId, because the containing
|
||||
// directory will be renamed. It will be reopened lazily on next access.
|
||||
committedTxnId.close();
|
||||
IOUtils.cleanup(LOG, committedTxnId);
|
||||
storage.getJournalManager().doPreUpgrade();
|
||||
}
|
||||
|
||||
|
@ -1021,14 +1021,25 @@ public class Journal implements Closeable {
|
|||
new File(previousDir, LAST_PROMISED_FILENAME), 0);
|
||||
PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
|
||||
new File(previousDir, LAST_WRITER_EPOCH), 0);
|
||||
BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile(
|
||||
new File(previousDir, COMMITTED_TXID_FILENAME),
|
||||
HdfsConstants.INVALID_TXID);
|
||||
|
||||
lastPromisedEpoch = new PersistentLongFile(
|
||||
new File(currentDir, LAST_PROMISED_FILENAME), 0);
|
||||
lastWriterEpoch = new PersistentLongFile(
|
||||
new File(currentDir, LAST_WRITER_EPOCH), 0);
|
||||
committedTxnId = new BestEffortLongFile(
|
||||
new File(currentDir, COMMITTED_TXID_FILENAME),
|
||||
HdfsConstants.INVALID_TXID);
|
||||
|
||||
lastPromisedEpoch.set(prevLastPromisedEpoch.get());
|
||||
lastWriterEpoch.set(prevLastWriterEpoch.get());
|
||||
try {
|
||||
lastPromisedEpoch.set(prevLastPromisedEpoch.get());
|
||||
lastWriterEpoch.set(prevLastWriterEpoch.get());
|
||||
committedTxnId.set(prevCommittedTxnId.get());
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, prevCommittedTxnId);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void doFinalize() throws IOException {
|
||||
|
@ -1049,7 +1060,7 @@ public class Journal implements Closeable {
|
|||
public synchronized void doRollback() throws IOException {
|
||||
// Do not hold file lock on committedTxnId, because the containing
|
||||
// directory will be renamed. It will be reopened lazily on next access.
|
||||
committedTxnId.close();
|
||||
IOUtils.cleanup(LOG, committedTxnId);
|
||||
storage.getJournalManager().doRollback();
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Map;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -93,7 +94,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|||
return journal;
|
||||
}
|
||||
|
||||
Journal getOrCreateJournal(String jid) throws IOException {
|
||||
@VisibleForTesting
|
||||
public Journal getOrCreateJournal(String jid) throws IOException {
|
||||
return getOrCreateJournal(jid, StartupOption.REGULAR);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,19 +38,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
||||
import org.apache.hadoop.hdfs.qjournal.server.Journal;
|
||||
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
/**
|
||||
* Tests for upgrading with HA enabled.
|
||||
|
@ -294,6 +298,16 @@ public class TestDFSUpgradeWithHA {
|
|||
}
|
||||
}
|
||||
|
||||
private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster)
|
||||
throws IOException {
|
||||
Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0)
|
||||
.getOrCreateJournal(MiniQJMHACluster.NAMESERVICE);
|
||||
BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox
|
||||
.getInternalState(journal1, "committedTxnId");
|
||||
return committedTxnId != null ? committedTxnId.get() :
|
||||
HdfsConstants.INVALID_TXID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that an HA NN can successfully upgrade when configured using
|
||||
* JournalNodes.
|
||||
|
@ -321,6 +335,9 @@ public class TestDFSUpgradeWithHA {
|
|||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||
|
||||
// get the value of the committedTxnId in journal nodes
|
||||
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||
|
||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||
// flag.
|
||||
cluster.shutdownNameNode(1);
|
||||
|
@ -331,6 +348,8 @@ public class TestDFSUpgradeWithHA {
|
|||
checkNnPreviousDirExistence(cluster, 1, false);
|
||||
checkJnPreviousDirExistence(qjCluster, true);
|
||||
|
||||
assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
|
||||
|
||||
// NN0 should come up in the active state when given the -upgrade option,
|
||||
// so no need to transition it to active.
|
||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||
|
@ -343,6 +362,8 @@ public class TestDFSUpgradeWithHA {
|
|||
cluster.transitionToActive(0);
|
||||
assertTrue(fs.mkdirs(new Path("/foo3")));
|
||||
|
||||
assertTrue(getCommittedTxnIdValue(qjCluster) > cidBeforeUpgrade);
|
||||
|
||||
// Now bootstrap the standby with the upgraded info.
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-force"},
|
||||
|
@ -389,11 +410,14 @@ public class TestDFSUpgradeWithHA {
|
|||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||
|
||||
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||
|
||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||
// flag.
|
||||
cluster.shutdownNameNode(1);
|
||||
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
|
||||
cluster.restartNameNode(0, false);
|
||||
assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
|
||||
|
||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||
|
||||
|
@ -409,8 +433,12 @@ public class TestDFSUpgradeWithHA {
|
|||
|
||||
cluster.restartNameNode(1);
|
||||
|
||||
final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||
assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
|
||||
|
||||
runFinalizeCommand(cluster);
|
||||
|
||||
assertEquals(cidDuringUpgrade, getCommittedTxnIdValue(qjCluster));
|
||||
checkClusterPreviousDirExistence(cluster, false);
|
||||
checkJnPreviousDirExistence(qjCluster, false);
|
||||
assertCTimesEqual(cluster);
|
||||
|
@ -615,6 +643,8 @@ public class TestDFSUpgradeWithHA {
|
|||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||
|
||||
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||
|
||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||
// flag.
|
||||
cluster.shutdownNameNode(1);
|
||||
|
@ -629,6 +659,9 @@ public class TestDFSUpgradeWithHA {
|
|||
// so no need to transition it to active.
|
||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||
|
||||
final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||
assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
|
||||
|
||||
// Now bootstrap the standby with the upgraded info.
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-force"},
|
||||
|
@ -649,6 +682,11 @@ public class TestDFSUpgradeWithHA {
|
|||
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
|
||||
NameNode.doRollback(conf, false);
|
||||
|
||||
final long cidAfterRollback = getCommittedTxnIdValue(qjCluster);
|
||||
assertTrue(cidBeforeUpgrade < cidAfterRollback);
|
||||
// make sure the committedTxnId has been reset correctly after rollback
|
||||
assertTrue(cidDuringUpgrade > cidAfterRollback);
|
||||
|
||||
// The rollback operation should have rolled back the first NN's local
|
||||
// dirs, and the shared dir, but not the other NN's dirs. Those have to be
|
||||
// done by bootstrapping the standby.
|
||||
|
|
Loading…
Reference in New Issue