diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 10dc2202d45..ae85c45479f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -961,6 +961,9 @@ Release 2.6.0 - UNRELEASED HDFS-7127. TestLeaseRecovery leaks MiniDFSCluster instances. (cnauroth) + HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId + file in the current directory. (jing9) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index c79e269b178..9be56b822de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -990,7 +990,7 @@ public class Journal implements Closeable { public synchronized void doPreUpgrade() throws IOException { // Do not hold file lock on committedTxnId, because the containing // directory will be renamed. It will be reopened lazily on next access. - committedTxnId.close(); + IOUtils.cleanup(LOG, committedTxnId); storage.getJournalManager().doPreUpgrade(); } @@ -1015,14 +1015,25 @@ public class Journal implements Closeable { new File(previousDir, LAST_PROMISED_FILENAME), 0); PersistentLongFile prevLastWriterEpoch = new PersistentLongFile( new File(previousDir, LAST_WRITER_EPOCH), 0); - + BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile( + new File(previousDir, COMMITTED_TXID_FILENAME), + HdfsConstants.INVALID_TXID); + lastPromisedEpoch = new PersistentLongFile( new File(currentDir, LAST_PROMISED_FILENAME), 0); lastWriterEpoch = new PersistentLongFile( new File(currentDir, LAST_WRITER_EPOCH), 0); - - lastPromisedEpoch.set(prevLastPromisedEpoch.get()); - lastWriterEpoch.set(prevLastWriterEpoch.get()); + committedTxnId = new BestEffortLongFile( + new File(currentDir, COMMITTED_TXID_FILENAME), + HdfsConstants.INVALID_TXID); + + try { + lastPromisedEpoch.set(prevLastPromisedEpoch.get()); + lastWriterEpoch.set(prevLastWriterEpoch.get()); + committedTxnId.set(prevCommittedTxnId.get()); + } finally { + IOUtils.cleanup(LOG, prevCommittedTxnId); + } } public synchronized void doFinalize() throws IOException { @@ -1043,7 +1054,7 @@ public class Journal implements Closeable { public synchronized void doRollback() throws IOException { // Do not hold file lock on committedTxnId, because the containing // directory will be renamed. It will be reopened lazily on next access. - committedTxnId.close(); + IOUtils.cleanup(LOG, committedTxnId); storage.getJournalManager().doRollback(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index df8199402dd..588bc580ce5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -26,6 +26,7 @@ import java.util.Map; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -92,8 +93,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { return journal; } - - Journal getOrCreateJournal(String jid) throws IOException { + + @VisibleForTesting + public Journal getOrCreateJournal(String jid) throws IOException { return getOrCreateJournal(jid, StartupOption.REGULAR); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java index c3a86741caa..c7494316577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java @@ -38,19 +38,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder; import org.apache.hadoop.hdfs.qjournal.server.Journal; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.hdfs.util.BestEffortLongFile; import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; import com.google.common.base.Joiner; +import org.mockito.internal.util.reflection.Whitebox; /** * Tests for upgrading with HA enabled. @@ -294,6 +298,16 @@ public class TestDFSUpgradeWithHA { } } + private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster) + throws IOException { + Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0) + .getOrCreateJournal(MiniQJMHACluster.NAMESERVICE); + BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox + .getInternalState(journal1, "committedTxnId"); + return committedTxnId != null ? committedTxnId.get() : + HdfsConstants.INVALID_TXID; + } + /** * Make sure that an HA NN can successfully upgrade when configured using * JournalNodes. @@ -320,7 +334,10 @@ public class TestDFSUpgradeWithHA { cluster.transitionToActive(0); fs = HATestUtil.configureFailoverFs(cluster, conf); assertTrue(fs.mkdirs(new Path("/foo1"))); - + + // get the value of the committedTxnId in journal nodes + final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster); + // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade // flag. cluster.shutdownNameNode(1); @@ -330,6 +347,8 @@ public class TestDFSUpgradeWithHA { checkNnPreviousDirExistence(cluster, 0, true); checkNnPreviousDirExistence(cluster, 1, false); checkJnPreviousDirExistence(qjCluster, true); + + assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster)); // NN0 should come up in the active state when given the -upgrade option, // so no need to transition it to active. @@ -342,6 +361,8 @@ public class TestDFSUpgradeWithHA { // Make sure we can still do FS ops after upgrading. cluster.transitionToActive(0); assertTrue(fs.mkdirs(new Path("/foo3"))); + + assertTrue(getCommittedTxnIdValue(qjCluster) > cidBeforeUpgrade); // Now bootstrap the standby with the upgraded info. int rc = BootstrapStandby.run( @@ -388,15 +409,18 @@ public class TestDFSUpgradeWithHA { cluster.transitionToActive(0); fs = HATestUtil.configureFailoverFs(cluster, conf); assertTrue(fs.mkdirs(new Path("/foo1"))); + + final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster); // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade // flag. cluster.shutdownNameNode(1); cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE); cluster.restartNameNode(0, false); + assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster)); assertTrue(fs.mkdirs(new Path("/foo2"))); - + checkNnPreviousDirExistence(cluster, 0, true); checkNnPreviousDirExistence(cluster, 1, false); checkJnPreviousDirExistence(qjCluster, true); @@ -408,9 +432,13 @@ public class TestDFSUpgradeWithHA { assertEquals(0, rc); cluster.restartNameNode(1); - + + final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster); + assertTrue(cidDuringUpgrade > cidBeforeUpgrade); + runFinalizeCommand(cluster); - + + assertEquals(cidDuringUpgrade, getCommittedTxnIdValue(qjCluster)); checkClusterPreviousDirExistence(cluster, false); checkJnPreviousDirExistence(qjCluster, false); assertCTimesEqual(cluster); @@ -614,7 +642,9 @@ public class TestDFSUpgradeWithHA { cluster.transitionToActive(0); fs = HATestUtil.configureFailoverFs(cluster, conf); assertTrue(fs.mkdirs(new Path("/foo1"))); - + + final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster); + // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade // flag. cluster.shutdownNameNode(1); @@ -628,7 +658,10 @@ public class TestDFSUpgradeWithHA { // NN0 should come up in the active state when given the -upgrade option, // so no need to transition it to active. assertTrue(fs.mkdirs(new Path("/foo2"))); - + + final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster); + assertTrue(cidDuringUpgrade > cidBeforeUpgrade); + // Now bootstrap the standby with the upgraded info. int rc = BootstrapStandby.run( new String[]{"-force"}, @@ -649,6 +682,11 @@ public class TestDFSUpgradeWithHA { conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs)); NameNode.doRollback(conf, false); + final long cidAfterRollback = getCommittedTxnIdValue(qjCluster); + assertTrue(cidBeforeUpgrade < cidAfterRollback); + // make sure the committedTxnId has been reset correctly after rollback + assertTrue(cidDuringUpgrade > cidAfterRollback); + // The rollback operation should have rolled back the first NN's local // dirs, and the shared dir, but not the other NN's dirs. Those have to be // done by bootstrapping the standby.