From 1bc282e0b3f74968c92751f0972746b012e72810 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Thu, 7 Mar 2019 08:26:07 -0800 Subject: [PATCH] HDFS-14317. Ensure checkpoints are created when in-progress edit log tailing is enabled with a period shorter than the log roll period. Contributed by Ekanth Sethuramalingam. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +- .../hdfs/server/namenode/FSEditLog.java | 3 +- .../server/namenode/ha/EditLogTailer.java | 12 ++- .../src/main/resources/hdfs-default.xml | 2 +- .../server/namenode/ha/TestEditLogTailer.java | 90 ++++++++++++++++++- .../namenode/ha/TestFailureToReadEdits.java | 12 +-- 6 files changed, 110 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fedfc5a7dce..b3cd0a4f857 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -331,7 +331,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4; public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold"; - public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f; + public static final float + DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 0.5f; public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms"; public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index cc1dcba8e3b..5b348e59211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -563,7 +563,8 @@ public class FSEditLog implements LogsPurgeable { /** * @return the first transaction ID in the current log segment */ - synchronized long getCurSegmentTxId() { + @VisibleForTesting + public synchronized long getCurSegmentTxId() { Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); return curSegmentTxId; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index f4903931d54..0ab8ef19902 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -110,6 +110,11 @@ public class EditLogTailer { */ private long lastLoadTimeMs; + /** + * The last time we triggered a edit log roll on active namenode. + */ + private long lastRollTimeMs; + /** * How often the Standby should roll edit logs. Since the Standby only reads * from finalized log segments, the Standby will only be as up-to-date as how @@ -140,7 +145,8 @@ public class EditLogTailer { private int nnLoopCount = 0; /** - * maximum number of retries we should give each of the remote namenodes before giving up + * Maximum number of retries we should give each of the remote namenodes + * before giving up. */ private int maxRetries; @@ -166,6 +172,7 @@ public class EditLogTailer { this.editLog = namesystem.getEditLog(); lastLoadTimeMs = monotonicNow(); + lastRollTimeMs = monotonicNow(); logRollPeriodMs = conf.getTimeDuration( DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, @@ -354,7 +361,7 @@ public class EditLogTailer { */ private boolean tooLongSinceLastLoad() { return logRollPeriodMs >= 0 && - (monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ; + (monotonicNow() - lastRollTimeMs) > logRollPeriodMs; } /** @@ -382,6 +389,7 @@ public class EditLogTailer { try { future = rollEditsRpcExecutor.submit(getNameNodeProxy()); future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); + lastRollTimeMs = monotonicNow(); lastRollTriggerTxId = lastLoadedTxnId; } catch (ExecutionException e) { LOG.warn("Unable to trigger a roll of the active NN", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 94d6512525d..e9b35be5012 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2614,7 +2614,7 @@ dfs.namenode.edit.log.autoroll.multiplier.threshold - 2.0 + 0.5 Determines when an active namenode will roll its own edit log. The actual threshold (in number of edits) is determined by multiplying diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index b94cd2af4a6..cb37361a353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -338,9 +338,97 @@ public class TestEditLogTailer { } } + @Test + public void testStandbyTriggersLogRollsWhenTailInProgressEdits() + throws Exception { + // Time in seconds to wait for standby to catch up to edits from active + final int standbyCatchupWaitTime = 2; + // Time in seconds to wait before checking if edit logs are rolled while + // expecting no edit log roll + final int noLogRollWaitTime = 2; + // Time in seconds to wait before checking if edit logs are rolled while + // expecting edit log roll + final int logRollWaitTime = 3; + + Configuration conf = getConf(); + conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, + standbyCatchupWaitTime + noLogRollWaitTime + 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + + MiniDFSCluster cluster = createMiniDFSCluster(conf, 2); + if (cluster == null) { + fail("failed to start mini cluster."); + } + + try { + int activeIndex = new Random().nextBoolean() ? 1 : 0; + int standbyIndex = (activeIndex == 0) ? 1 : 0; + cluster.transitionToActive(activeIndex); + NameNode active = cluster.getNameNode(activeIndex); + NameNode standby = cluster.getNameNode(standbyIndex); + + long origTxId = active.getNamesystem().getFSImage().getEditLog() + .getCurSegmentTxId(); + for (int i = 0; i < DIRS_TO_MAKE / 2; i++) { + NameNodeAdapter.mkdirs(active, getDirPath(i), + new PermissionStatus("test", "test", + new FsPermission((short)00755)), true); + } + + long activeTxId = active.getNamesystem().getFSImage().getEditLog() + .getLastWrittenTxId(); + waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId, + standbyCatchupWaitTime); + + for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) { + NameNodeAdapter.mkdirs(active, getDirPath(i), + new PermissionStatus("test", "test", + new FsPermission((short)00755)), true); + } + + boolean exceptionThrown = false; + try { + checkForLogRoll(active, origTxId, noLogRollWaitTime); + } catch (TimeoutException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + + checkForLogRoll(active, origTxId, logRollWaitTime); + } finally { + cluster.shutdown(); + } + } + + private static void waitForStandbyToCatchUpWithInProgressEdits( + final NameNode standby, final long activeTxId, + int maxWaitSec) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + long standbyTxId = standby.getNamesystem().getFSImage() + .getLastAppliedTxId(); + return (standbyTxId >= activeTxId); + } + }, 100, maxWaitSec * 1000); + } + + private static void checkForLogRoll(final NameNode active, + final long origTxId, int maxWaitSec) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog() + .getCurSegmentTxId(); + return (origTxId != curSegmentTxId); + } + }, 100, maxWaitSec * 1000); + } + private static MiniDFSCluster createMiniDFSCluster(Configuration conf, int nnCount) throws IOException { - int basePort = 10060 + new Random().nextInt(100) * 2; + int basePort = 10060 + new Random().nextInt(1000) * 2; // By passing in basePort, name node will have IPC port set, // which is needed for enabling roll log. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java index e233b631282..58efc0fa2e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -254,10 +254,10 @@ public class TestFailureToReadEdits { // Once the standby catches up, it should notice that it needs to // do a checkpoint and save one to its local directories. - HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3)); + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5)); // It should also upload it back to the active. - HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5)); causeFailureOnEditLogRead(); @@ -272,15 +272,15 @@ public class TestFailureToReadEdits { } // 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR - HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3, 5)); + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5, 7)); // It should also upload it back to the active. - HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7)); // Restart the active NN cluster.restartNameNode(0); - HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7)); FileSystem fs0 = null; try { @@ -309,7 +309,7 @@ public class TestFailureToReadEdits { HATestUtil.waitForStandbyToCatchUp(nn0, nn1); // It should also upload it back to the active. - HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5)); causeFailureOnEditLogRead();