From bbc6d254c8a953abba69415d80edeede3ee6269d Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 4 Aug 2017 12:51:33 -0700 Subject: [PATCH] HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru. --- .../hadoop/hdfs/qjournal/server/Journal.java | 3 +- .../hdfs/qjournal/server/JournalMetrics.java | 11 ++ .../qjournal/server/JournalNodeSyncer.java | 4 + .../hdfs/qjournal/server/TestJournalNode.java | 6 +- .../{ => server}/TestJournalNodeSync.java | 182 +++++++++++++++++- 5 files changed, 197 insertions(+), 9 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/{ => server}/TestJournalNodeSync.java (58%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 0041d5eda7b..0f4091dcb23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -286,8 +286,7 @@ private void updateHighestWrittenTxId(long val) { fjm.setLastReadableTxId(val); } - @VisibleForTesting - JournalMetrics getMetricsForTests() { + JournalMetrics getMetrics() { return metrics; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java index cffe2c1f55a..fcfd9016cd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java @@ -45,6 +45,9 @@ class JournalMetrics { @Metric("Number of batches written where this node was lagging") MutableCounterLong batchesWrittenWhileLagging; + + @Metric("Number of edit logs downloaded by JournalNodeSyncer") + private MutableCounterLong numEditLogsSynced; private final int[] QUANTILE_INTERVALS = new int[] { 1*60, // 1m @@ -120,4 +123,12 @@ void addSync(long us) { q.add(us); } } + + public MutableCounterLong getNumEditLogsSynced() { + return numEditLogsSynced; + } + + public void incrNumEditLogsSynced() { + numEditLogsSynced.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 479f6a00e2c..537ba0a0fd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -77,6 +77,7 @@ public class JournalNodeSyncer { private final long journalSyncInterval; private final int logSegmentTransferTimeout; private final DataTransferThrottler throttler; + private final JournalMetrics metrics; JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, Configuration conf) { @@ -93,6 +94,7 @@ public class JournalNodeSyncer { DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); throttler = getThrottler(conf); + metrics = journal.getMetrics(); } void stopSync() { @@ -411,6 +413,8 @@ private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) LOG.warn("Deleting " + tmpEditsFile + " has failed"); } return false; + } else { + metrics.incrNumEditLogsSynced(); } return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 9dd6846d7fa..28ec7082537 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -102,7 +102,7 @@ public void teardown() throws Exception { @Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); @@ -117,7 +117,7 @@ public void testJournal() throws Exception { ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); @@ -130,7 +130,7 @@ public void testJournal() throws Exception { ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java similarity index 58% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 8415a6f54e9..2964f05c876 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.qjournal; +package org.apache.hadoop.hdfs.qjournal.server; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -25,17 +25,21 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager .getLogFile; - +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import java.io.File; import java.io.IOException; @@ -46,6 +50,7 @@ * Unit test for Journal Node formatting upon re-installation and syncing. */ public class TestJournalNodeSync { + private Configuration conf; private MiniQJMHACluster qjmhaCluster; private MiniDFSCluster dfsCluster; private MiniJournalCluster jCluster; @@ -54,11 +59,18 @@ public class TestJournalNodeSync { private int editsPerformed = 0; private final String jid = "ns1"; + @Rule + public TestName testName = new TestName(); + @Before public void setUpMiniCluster() throws IOException { - final Configuration conf = new HdfsConfiguration(); + conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); + if (testName.getMethodName().equals( + "testSyncAfterJNdowntimeWithoutQJournalQueue")) { + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0); + } qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) .build(); dfsCluster = qjmhaCluster.getDfsCluster(); @@ -214,6 +226,156 @@ public void testRandomJournalMissingLogs() throws Exception { GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); } + // Test JournalNode Sync when a JN id down while NN is actively writing + // logs and comes back up after some time. + @Test (timeout=300_000) + public void testSyncAfterJNdowntime() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(); + startTxIds[1] = generateEditLog(); + + // Stop the first JN + jCluster.getJournalNode(0).stop(0); + + // Roll some more edits while the first JN is down + for (int i = 2; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // Re-start the first JN + jCluster.restartJournalNode(0); + + // Roll an edit to update the committed tx id of the first JN + generateEditLog(); + + // List the edit logs rolled during JN down time. + List missingLogs = Lists.newArrayList(); + for (int i = 2; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that JNSync downloaded the edit logs rolled during JN down time. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + /** + * Test JournalNode Sync when a JN id down while NN is actively writing + * logs and comes back up after some time with no edit log queueing. + * Queuing disabled during the cluster setup {@link #setUpMiniCluster()} + * @throws Exception + */ + @Test (timeout=300_000) + public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{ + // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()} + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(); + startTxIds[1] = generateEditLog(2); + + // Stop the first JN + jCluster.getJournalNode(0).stop(0); + + // Roll some more edits while the first JN is down + for (int i = 2; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // Re-start the first JN + jCluster.restartJournalNode(0); + + // After JN restart and before rolling another edit, the missing edit + // logs will not by synced as the committed tx id of the JN will be + // less than the start tx id's of the missing edit logs and edit log queuing + // has been disabled. + // Roll an edit to update the committed tx id of the first JN + generateEditLog(2); + + // List the edit logs rolled during JN down time. + List missingLogs = Lists.newArrayList(); + for (int i = 2; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that JNSync downloaded the edit logs rolled during JN down time. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + + // Check that all the missing edit logs have been downloaded via + // JournalNodeSyncer alone (as the edit log queueing has been disabled) + long numEditLogsSynced = jCluster.getJournalNode(0).getOrCreateJournal(jid) + .getMetrics().getNumEditLogsSynced().value(); + Assert.assertTrue("Edit logs downloaded outside syncer. Expected 8 or " + + "more downloads, got " + numEditLogsSynced + " downloads instead", + numEditLogsSynced >= 8); + } + + // Test JournalNode Sync when a JN is formatted while NN is actively writing + // logs. + @Test (timeout=300_000) + public void testSyncAfterJNformat() throws Exception{ + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(1); + startTxIds[1] = generateEditLog(2); + startTxIds[2] = generateEditLog(4); + startTxIds[3] = generateEditLog(6); + + Journal journal1 = jCluster.getJournalNode(0).getOrCreateJournal(jid); + NamespaceInfo nsInfo = journal1.getStorage().getNamespaceInfo(); + + // Delete contents of current directory of one JN + for (File file : firstJournalCurrentDir.listFiles()) { + file.delete(); + } + + // Format the JN + journal1.format(nsInfo); + + // Roll some more edits + for (int i = 4; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // List the edit logs rolled during JN down time. + List missingLogs = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that the formatted JN has all the edit logs. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + private File deleteEditLog(File currentDir, long startTxId) throws IOException { EditLogFile logFile = getLogFile(currentDir, startTxId); @@ -242,8 +404,20 @@ private boolean doAnEdit() throws IOException { * @return the startTxId of next segment after rolling edits. */ private long generateEditLog() throws IOException { + return generateEditLog(1); + } + + /** + * Does specified number of edits and rolls the Edit Log. + * + * @param numEdits number of Edits to perform + * @return the startTxId of next segment after rolling edits. + */ + private long generateEditLog(int numEdits) throws IOException { long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); - Assert.assertTrue("Failed to do an edit", doAnEdit()); + for (int i = 1; i <= numEdits; i++) { + Assert.assertTrue("Failed to do an edit", doAnEdit()); + } dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); return startTxId; }