diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b839e517d36..32db6a55899 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1004,6 +1004,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period"; public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m + public static final String DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY = "dfs.ha.tail-edits.period.backoff-max"; + public static final int DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT = 0; // disabled public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries"; public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3; public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 11e05a218b0..536986152d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -134,10 +134,17 @@ public class EditLogTailer { private final ExecutorService rollEditsRpcExecutor; /** - * How often the Standby should check if there are new finalized segment(s) - * available to be read from. + * How often the tailer should check if there are new edit log entries + * ready to be consumed. This is the initial delay before any backoff. */ private final long sleepTimeMs; + /** + * The maximum time the tailer should wait between checking for new edit log + * entries. Exponential backoff will be applied when an edit log tail is + * performed but no edits are available to be read. If this is less than or + * equal to 0, backoff is disabled. + */ + private final long maxSleepTimeMs; private final int nnCount; private NamenodeProtocol cachedActiveProxy = null; @@ -206,6 +213,19 @@ public class EditLogTailer { DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + long maxSleepTimeMsTemp = conf.getTimeDuration( + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT, + TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + if (maxSleepTimeMsTemp > 0 && maxSleepTimeMsTemp < sleepTimeMs) { + LOG.warn("{} was configured to be {} ms, but this is less than {}." + + "Disabling backoff when tailing edit logs.", + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY, + maxSleepTimeMsTemp, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY); + maxSleepTimeMs = 0; + } else { + maxSleepTimeMs = maxSleepTimeMsTemp; + } rollEditsTimeoutMs = conf.getTimeDuration( DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, @@ -291,7 +311,7 @@ public class EditLogTailer { } @VisibleForTesting - public void doTailEdits() throws IOException, InterruptedException { + public long doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will @@ -316,7 +336,7 @@ public class EditLogTailer { // edits file hasn't been started yet. LOG.warn("Edits tailer failed to find any streams. Will try again " + "later.", ioe); - return; + return 0; } finally { NameNode.getNameNodeMetrics().addEditLogFetchTime( Time.monotonicNow() - startTime); @@ -347,6 +367,7 @@ public class EditLogTailer { lastLoadTimeMs = monotonicNow(); } lastLoadedTxnId = image.getLastAppliedTxId(); + return editsLoaded; } finally { namesystem.writeUnlock(); } @@ -407,6 +428,11 @@ public class EditLogTailer { } } + @VisibleForTesting + void sleep(long sleepTimeMillis) throws InterruptedException { + Thread.sleep(sleepTimeMillis); + } + /** * The thread which does the actual work of tailing edits journals and * applying the transactions to the FSNS. @@ -435,7 +461,9 @@ public class EditLogTailer { } private void doWork() { + long currentSleepTimeMs = sleepTimeMs; while (shouldRun) { + long editsTailed = 0; try { // There's no point in triggering a log roll if the Standby hasn't // read any more transactions since the last time a roll was @@ -461,7 +489,7 @@ public class EditLogTailer { try { NameNode.getNameNodeMetrics().addEditLogTailInterval( startTime - lastLoadTimeMs); - doTailEdits(); + editsTailed = doTailEdits(); } finally { namesystem.cpUnlock(); NameNode.getNameNodeMetrics().addEditLogTailTime( @@ -481,7 +509,17 @@ public class EditLogTailer { } try { - Thread.sleep(sleepTimeMs); + if (editsTailed == 0 && maxSleepTimeMs > 0) { + // If no edits were tailed, apply exponential backoff + // before tailing again. Double the current sleep time on each + // empty response, but don't exceed the max. If the sleep time + // was configured as 0, start the backoff at 1 ms. + currentSleepTimeMs = Math.min(maxSleepTimeMs, + (currentSleepTimeMs == 0 ? 1 : currentSleepTimeMs) * 2); + } else { + currentSleepTimeMs = sleepTimeMs; // reset to initial sleep time + } + EditLogTailer.this.sleep(currentSleepTimeMs); } catch (InterruptedException e) { LOG.warn("Edit log tailer interrupted: {}", e.getMessage()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 72715037fba..2f7a4ad3a9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1662,9 +1662,25 @@ dfs.ha.tail-edits.period 60s - How often, in seconds, the StandbyNode should check for new - finalized log segments in the shared edits log. - Support multiple time unit suffix(case insensitive), as described + How often, the StandbyNode and ObserverNode should check if there are new + edit log entries ready to be consumed. This is the minimum period between + checking; exponential backoff will be applied if no edits are found and + dfs.ha.tail-edits.period.backoff-max is configured. By default, no + backoff is applied. + Supports multiple time unit suffix (case insensitive), as described + in dfs.heartbeat.interval. + + + + + dfs.ha.tail-edits.period.backoff-max + 0 + + The maximum time the tailer should wait between checking for new edit log + entries. Exponential backoff will be applied when an edit log tail is + performed but no edits are available to be read. Values less than or + equal to zero disable backoff entirely; this is the default behavior. + Supports multiple time unit suffix (case insensitive), as described in dfs.heartbeat.interval. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md index b212f00b1b1..07c384c1bcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md @@ -140,13 +140,33 @@ few configurations to your **hdfs-site.xml**: If too large, RPC time will increase as client requests will wait longer in the RPC queue before Observer tails edit logs and catches up the latest state of Active. The default value is 1min. It is - **highly recommend** to configure this to a much lower value. + **highly recommend** to configure this to a much lower value. It is also + recommended to configure backoff to be enabled when using low values; please + see below. dfs.ha.tail-edits.period 0ms +* **dfs.ha.tail-edits.period.backoff-max** - whether the Standby/Observer + NameNodes should perform backoff when tailing edits. + + This determines the behavior of a Standby/Observer when it attempts to + tail edits from the JournalNodes and finds no edits available. This is a + common situation when the edit tailing period is very low, but the cluster + is not heavily loaded. Without this configuration, such a situation will + cause high utilization on the Standby/Observer as it constantly attempts to + read edits even though there are none available. With this configuration + enabled, exponential backoff will be performed when an edit tail attempt + returns 0 edits. This configuration specifies the maximum time to wait + between edit tailing attempts. + + + dfs.ha.tail-edits.period + 10s + + * **dfs.journalnode.edit-cache-size.bytes** - the in-memory cache size, in bytes, on the JournalNodes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index cb37361a353..42b9660c611 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; @@ -27,9 +28,14 @@ import java.io.IOException; import java.net.BindException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.Queue; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -42,8 +48,10 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; @@ -150,7 +158,47 @@ public class TestEditLogTailer { cluster.shutdown(); } } - + + @Test + public void testTailerBackoff() throws Exception { + Configuration conf = new Configuration(); + NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE); + conf.setTimeDuration(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, + 1, TimeUnit.MILLISECONDS); + conf.setTimeDuration(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY, + 10, TimeUnit.MILLISECONDS); + FSNamesystem mockNamesystem = mock(FSNamesystem.class); + FSImage mockImage = mock(FSImage.class); + NNStorage mockStorage = mock(NNStorage.class); + when(mockNamesystem.getFSImage()).thenReturn(mockImage); + when(mockImage.getStorage()).thenReturn(mockStorage); + final Queue sleepDurations = new ConcurrentLinkedQueue<>(); + final int zeroEditCount = 5; + final AtomicInteger tailEditsCallCount = new AtomicInteger(0); + EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf) { + @Override + void sleep(long sleepTimeMs) { + if (sleepDurations.size() <= zeroEditCount) { + sleepDurations.add(sleepTimeMs); + } + } + + @Override + public long doTailEdits() { + return tailEditsCallCount.getAndIncrement() < zeroEditCount ? 0 : 1; + } + }; + tailer.start(); + try { + GenericTestUtils.waitFor( + () -> sleepDurations.size() > zeroEditCount, 50, 10000); + } finally { + tailer.stop(); + } + List expectedDurations = Arrays.asList(2L, 4L, 8L, 10L, 10L, 1L); + assertEquals(expectedDurations, new ArrayList<>(sleepDurations)); + } + @Test public void testNN0TriggersLogRolls() throws Exception { testStandbyTriggersLogRolls(0);