HDFS-14370. Add exponential backoff to the edit log tailer to avoid spinning on empty edit tail requests. Contributed by Erik Krogen.
(cherry picked from827dbb11e2
) (cherry picked from016aa13940
) (cherry picked fromf6ce2f4a50
) (cherry picked from5657e45fb2
)
This commit is contained in:
parent
885b7f4c98
commit
719214ff30
|
@ -818,6 +818,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
|
||||
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
|
||||
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
||||
public static final String DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY = "dfs.ha.tail-edits.period.backoff-max";
|
||||
public static final int DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT = 0; // disabled
|
||||
public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY =
|
||||
"dfs.ha.tail-edits.rolledits.timeout";
|
||||
public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m
|
||||
|
|
|
@ -125,10 +125,17 @@ public class EditLogTailer {
|
|||
private final ExecutorService rollEditsRpcExecutor;
|
||||
|
||||
/**
|
||||
* How often the Standby should check if there are new finalized segment(s)
|
||||
* available to be read from.
|
||||
* How often the tailer should check if there are new edit log entries
|
||||
* ready to be consumed. This is the initial delay before any backoff.
|
||||
*/
|
||||
private final long sleepTimeMs;
|
||||
/**
|
||||
* The maximum time the tailer should wait between checking for new edit log
|
||||
* entries. Exponential backoff will be applied when an edit log tail is
|
||||
* performed but no edits are available to be read. If this is less than or
|
||||
* equal to 0, backoff is disabled.
|
||||
*/
|
||||
private final long maxSleepTimeMs;
|
||||
|
||||
private final int nnCount;
|
||||
private NamenodeProtocol cachedActiveProxy = null;
|
||||
|
@ -183,6 +190,20 @@ public class EditLogTailer {
|
|||
|
||||
sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
|
||||
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
|
||||
long maxSleepTimeMsTemp = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
||||
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
if (maxSleepTimeMsTemp > 0 && maxSleepTimeMsTemp < sleepTimeMs) {
|
||||
LOG.warn(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY
|
||||
+ " was configured to be " + maxSleepTimeMsTemp
|
||||
+ " ms, but this is less than "
|
||||
+ DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY
|
||||
+ ". Disabling backoff when tailing edit logs.");
|
||||
maxSleepTimeMs = 0;
|
||||
} else {
|
||||
maxSleepTimeMs = maxSleepTimeMsTemp;
|
||||
}
|
||||
|
||||
maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY,
|
||||
DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
|
||||
|
@ -263,7 +284,7 @@ public class EditLogTailer {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void doTailEdits() throws IOException, InterruptedException {
|
||||
public long doTailEdits() throws IOException, InterruptedException {
|
||||
// Write lock needs to be interruptible here because the
|
||||
// transitionToActive RPC takes the write lock before calling
|
||||
// tailer.stop() -- so if we're not interruptible, it will
|
||||
|
@ -287,7 +308,7 @@ public class EditLogTailer {
|
|||
// edits file hasn't been started yet.
|
||||
LOG.warn("Edits tailer failed to find any streams. Will try again " +
|
||||
"later.", ioe);
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("edit streams to load from: " + streams.size());
|
||||
|
@ -313,6 +334,7 @@ public class EditLogTailer {
|
|||
lastLoadTimeMs = monotonicNow();
|
||||
}
|
||||
lastLoadedTxnId = image.getLastAppliedTxId();
|
||||
return editsLoaded;
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
|
@ -373,6 +395,11 @@ public class EditLogTailer {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void sleep(long sleepTimeMillis) throws InterruptedException {
|
||||
Thread.sleep(sleepTimeMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread which does the actual work of tailing edits journals and
|
||||
* applying the transactions to the FSNS.
|
||||
|
@ -401,7 +428,9 @@ public class EditLogTailer {
|
|||
}
|
||||
|
||||
private void doWork() {
|
||||
long currentSleepTimeMs = sleepTimeMs;
|
||||
while (shouldRun) {
|
||||
long editsTailed = 0;
|
||||
try {
|
||||
// There's no point in triggering a log roll if the Standby hasn't
|
||||
// read any more transactions since the last time a roll was
|
||||
|
@ -424,7 +453,7 @@ public class EditLogTailer {
|
|||
// state updates.
|
||||
namesystem.cpLockInterruptibly();
|
||||
try {
|
||||
doTailEdits();
|
||||
editsTailed = doTailEdits();
|
||||
} finally {
|
||||
namesystem.cpUnlock();
|
||||
}
|
||||
|
@ -442,7 +471,17 @@ public class EditLogTailer {
|
|||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(sleepTimeMs);
|
||||
if (editsTailed == 0 && maxSleepTimeMs > 0) {
|
||||
// If no edits were tailed, apply exponential backoff
|
||||
// before tailing again. Double the current sleep time on each
|
||||
// empty response, but don't exceed the max. If the sleep time
|
||||
// was configured as 0, start the backoff at 1 ms.
|
||||
currentSleepTimeMs = Math.min(maxSleepTimeMs,
|
||||
(currentSleepTimeMs == 0 ? 1 : currentSleepTimeMs) * 2);
|
||||
} else {
|
||||
currentSleepTimeMs = sleepTimeMs; // reset to initial sleep time
|
||||
}
|
||||
EditLogTailer.this.sleep(currentSleepTimeMs);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Edit log tailer interrupted", e);
|
||||
}
|
||||
|
|
|
@ -1564,8 +1564,26 @@
|
|||
<name>dfs.ha.tail-edits.period</name>
|
||||
<value>60</value>
|
||||
<description>
|
||||
How often, in seconds, the StandbyNode should check for new
|
||||
finalized log segments in the shared edits log.
|
||||
How often, the StandbyNode and ObserverNode should check if there are new
|
||||
edit log entries ready to be consumed. This is the minimum period between
|
||||
checking; exponential backoff will be applied if no edits are found and
|
||||
dfs.ha.tail-edits.period.backoff-max is configured. By default, no
|
||||
backoff is applied.
|
||||
Supports multiple time unit suffix (case insensitive), as described
|
||||
in dfs.heartbeat.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.ha.tail-edits.period.backoff-max</name>
|
||||
<value>0</value>
|
||||
<description>
|
||||
The maximum time the tailer should wait between checking for new edit log
|
||||
entries. Exponential backoff will be applied when an edit log tail is
|
||||
performed but no edits are available to be read. Values less than or
|
||||
equal to zero disable backoff entirely; this is the default behavior.
|
||||
Supports multiple time unit suffix (case insensitive), as described
|
||||
in dfs.heartbeat.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -140,13 +140,33 @@ few configurations to your **hdfs-site.xml**:
|
|||
If too large, RPC time will increase as client requests will wait
|
||||
longer in the RPC queue before Observer tails edit logs and catches
|
||||
up the latest state of Active. The default value is 1min. It is
|
||||
**highly recommend** to configure this to a much lower value.
|
||||
**highly recommend** to configure this to a much lower value. It is also
|
||||
recommended to configure backoff to be enabled when using low values; please
|
||||
see below.
|
||||
|
||||
<property>
|
||||
<name>dfs.ha.tail-edits.period</name>
|
||||
<value>0ms</value>
|
||||
</property>
|
||||
|
||||
* **dfs.ha.tail-edits.period.backoff-max** - whether the Standby/Observer
|
||||
NameNodes should perform backoff when tailing edits.
|
||||
|
||||
This determines the behavior of a Standby/Observer when it attempts to
|
||||
tail edits from the JournalNodes and finds no edits available. This is a
|
||||
common situation when the edit tailing period is very low, but the cluster
|
||||
is not heavily loaded. Without this configuration, such a situation will
|
||||
cause high utilization on the Standby/Observer as it constantly attempts to
|
||||
read edits even though there are none available. With this configuration
|
||||
enabled, exponential backoff will be performed when an edit tail attempt
|
||||
returns 0 edits. This configuration specifies the maximum time to wait
|
||||
between edit tailing attempts.
|
||||
|
||||
<property>
|
||||
<name>dfs.ha.tail-edits.period</name>
|
||||
<value>10s</value>
|
||||
</property>
|
||||
|
||||
* **dfs.journalnode.edit-cache-size.bytes** - the in-memory cache size,
|
||||
in bytes, on the JournalNodes.
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -27,9 +28,14 @@ import java.io.IOException;
|
|||
import java.net.BindException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -42,8 +48,10 @@ import org.apache.hadoop.hdfs.HAUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
|
@ -142,7 +150,51 @@ public class TestEditLogTailer {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTailerBackoff() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); // seconds
|
||||
conf.setTimeDuration(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
||||
10, TimeUnit.SECONDS);
|
||||
FSNamesystem mockNamesystem = mock(FSNamesystem.class);
|
||||
FSImage mockImage = mock(FSImage.class);
|
||||
NNStorage mockStorage = mock(NNStorage.class);
|
||||
when(mockNamesystem.getFSImage()).thenReturn(mockImage);
|
||||
when(mockImage.getStorage()).thenReturn(mockStorage);
|
||||
final Queue<Long> sleepDurations = new ConcurrentLinkedQueue<>();
|
||||
final int zeroEditCount = 5;
|
||||
final AtomicInteger tailEditsCallCount = new AtomicInteger(0);
|
||||
EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf) {
|
||||
@Override
|
||||
void sleep(long sleepTimeMs) {
|
||||
if (sleepDurations.size() <= zeroEditCount) {
|
||||
sleepDurations.add(sleepTimeMs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long doTailEdits() {
|
||||
return tailEditsCallCount.getAndIncrement() < zeroEditCount ? 0 : 1;
|
||||
}
|
||||
};
|
||||
tailer.start();
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return sleepDurations.size() > zeroEditCount;
|
||||
}
|
||||
}, 50, 10000);
|
||||
} finally {
|
||||
tailer.stop();
|
||||
}
|
||||
List<Long> expectedDurations = Arrays.asList(
|
||||
2000L, 4000L, 8000L, 10000L, 10000L, 1000L);
|
||||
assertEquals(expectedDurations, new ArrayList<>(sleepDurations));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNN0TriggersLogRolls() throws Exception {
|
||||
testStandbyTriggersLogRolls(0);
|
||||
|
|
Loading…
Reference in New Issue