From fd66a243bfffc8260bfd69058625d4d9509cafe6 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 30 Aug 2017 10:29:42 -0700 Subject: [PATCH] HDFS-12356. Unit test for JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru. --- .../qjournal/server/TestJournalNodeSync.java | 176 ++++++++++++++---- 1 file changed, 137 insertions(+), 39 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 2964f05c876..09ef3a57560 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.qjournal.server; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; @@ -54,10 +56,11 @@ public class TestJournalNodeSync { private MiniQJMHACluster qjmhaCluster; private MiniDFSCluster dfsCluster; private MiniJournalCluster jCluster; - private FileSystem fs; private FSNamesystem namesystem; private int editsPerformed = 0; private final String jid = "ns1"; + private int activeNNindex=0; + private static final int DFS_HA_TAILEDITS_PERIOD_SECONDS=1; @Rule public TestName testName = new TestName(); @@ -71,13 +74,16 @@ public class TestJournalNodeSync { "testSyncAfterJNdowntimeWithoutQJournalQueue")) { conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0); } + if (testName.getMethodName().equals("testSyncDuringRollingUpgrade")) { + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, + DFS_HA_TAILEDITS_PERIOD_SECONDS); + } qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) .build(); dfsCluster = qjmhaCluster.getDfsCluster(); jCluster = qjmhaCluster.getJournalCluster(); dfsCluster.transitionToActive(0); - fs = dfsCluster.getFileSystem(0); namesystem = dfsCluster.getNamesystem(0); } @@ -192,36 +198,7 @@ public class TestJournalNodeSync { // the journals. @Test(timeout=60000) public void testRandomJournalMissingLogs() throws Exception { - Random randomJournal = new Random(); - - List journalCurrentDirs = Lists.newArrayList(); - - for (int i = 0; i < 3; i++) { - journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i, - jid)).getCurrentDir()); - } - - int count = 0; - long lastStartTxId; - int journalIndex; - List missingLogs = Lists.newArrayList(); - while (count < 5) { - lastStartTxId = generateEditLog(); - - // Delete the last edit log segment from randomly selected journal node - journalIndex = randomJournal.nextInt(3); - missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), - lastStartTxId)); - - // Delete the last edit log segment from two journals for some logs - if (count % 2 == 0) { - journalIndex = (journalIndex + 1) % 3; - missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), - lastStartTxId)); - } - - count++; - } + List missingLogs = deleteEditLogsFromRandomJN(); GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); } @@ -277,7 +254,8 @@ public class TestJournalNodeSync { */ @Test (timeout=300_000) public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{ - // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()} + // QJournal Queuing is disabled during the cluster setup + // {@link #setUpMiniCluster()} File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) .getCurrentDir(); @@ -376,11 +354,88 @@ public class TestJournalNodeSync { GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); } + // Test JournalNode Sync during a Rolling Upgrade of NN. + @Test (timeout=300_000) + public void testSyncDuringRollingUpgrade() throws Exception { + + DistributedFileSystem dfsActive; + int standbyNNindex; + + if (dfsCluster.getNameNode(0).isActiveState()) { + activeNNindex = 0; + standbyNNindex = 1; + } else { + activeNNindex = 1; + standbyNNindex = 0; + } + dfsActive = dfsCluster.getFileSystem(activeNNindex); + + // Prepare for rolling upgrade + final RollingUpgradeInfo info = dfsActive.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.PREPARE); + + //query rolling upgrade + Assert.assertEquals(info, dfsActive.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.QUERY)); + + // Restart the Standby NN with rollingUpgrade option + dfsCluster.restartNameNode(standbyNNindex, true, + "-rollingUpgrade", "started"); + Assert.assertEquals(info, dfsActive.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.QUERY)); + + // Do some edits and delete some edit logs + List missingLogs = deleteEditLogsFromRandomJN(); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + + // Transition the active NN to standby and standby to active + dfsCluster.transitionToStandby(activeNNindex); + + // Let Standby NN catch up tailing edit logs before transitioning it to + // active + Thread.sleep(30*DFS_HA_TAILEDITS_PERIOD_SECONDS*1000); + + dfsCluster.transitionToActive(standbyNNindex); + dfsCluster.waitActive(); + + activeNNindex=standbyNNindex; + standbyNNindex=((activeNNindex+1)%2); + dfsActive = dfsCluster.getFileSystem(activeNNindex); + + Assert.assertTrue(dfsCluster.getNameNode(activeNNindex).isActiveState()); + Assert.assertFalse(dfsCluster.getNameNode(standbyNNindex).isActiveState()); + + // Restart the current standby NN (previously active) + dfsCluster.restartNameNode(standbyNNindex, true, + "-rollingUpgrade", "started"); + Assert.assertEquals(info, dfsActive.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.QUERY)); + dfsCluster.waitActive(); + + // Do some edits and delete some edit logs + missingLogs.addAll(deleteEditLogsFromRandomJN()); + + // Check that JNSync downloaded the edit logs rolled during rolling upgrade. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + + //finalize rolling upgrade + final RollingUpgradeInfo finalize = dfsActive.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.FINALIZE); + Assert.assertTrue(finalize.isFinalized()); + + // Check the missing edit logs exist after finalizing rolling upgrade + for (File editLog : missingLogs) { + Assert.assertTrue("Edit log missing after finalizing rolling upgrade", + editLog.exists()); + } + } + private File deleteEditLog(File currentDir, long startTxId) throws IOException { EditLogFile logFile = getLogFile(currentDir, startTxId); while (logFile.isInProgress()) { - dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); + dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog(); logFile = getLogFile(currentDir, startTxId); } File deleteFile = logFile.getFile(); @@ -389,13 +444,55 @@ public class TestJournalNodeSync { return deleteFile; } + private List deleteEditLogsFromRandomJN() throws IOException { + Random random = new Random(); + + List journalCurrentDirs = Lists.newArrayList(); + + for (int i = 0; i < 3; i++) { + journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i, + jid)).getCurrentDir()); + } + + long[] startTxIds = new long[20]; + for (int i = 0; i < 20; i++) { + startTxIds[i] = generateEditLog(); + } + + int count = 0, startTxIdIndex; + long startTxId; + int journalIndex; + List missingLogs = Lists.newArrayList(); + List deletedStartTxIds = Lists.newArrayList(); + while (count < 5) { + // Select a random edit log to delete + startTxIdIndex = random.nextInt(20); + while (deletedStartTxIds.contains(startTxIdIndex)) { + startTxIdIndex = random.nextInt(20); + } + startTxId = startTxIds[startTxIdIndex]; + deletedStartTxIds.add(startTxIdIndex); + + // Delete the randomly selected edit log segment from randomly selected + // journal node + journalIndex = random.nextInt(3); + missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), + startTxId)); + + count++; + } + + return missingLogs; + } + /** * Do a mutative metadata operation on the file system. * * @return true if the operation was successful, false otherwise. */ private boolean doAnEdit() throws IOException { - return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++))); + return dfsCluster.getFileSystem(activeNNindex).mkdirs( + new Path("/tmp", Integer.toString(editsPerformed++))); } /** @@ -414,12 +511,13 @@ public class TestJournalNodeSync { * @return the startTxId of next segment after rolling edits. */ private long generateEditLog(int numEdits) throws IOException { - long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); + long lastWrittenTxId = dfsCluster.getNameNode(activeNNindex).getFSImage() + .getEditLog().getLastWrittenTxId(); for (int i = 1; i <= numEdits; i++) { Assert.assertTrue("Failed to do an edit", doAnEdit()); } - dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); - return startTxId; + dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog(); + return lastWrittenTxId; } private Supplier editLogExists(List editLogs) {