[SBN Read] Slow clients when Observer reads are enabled but there are no Observers on the cluster. Conntributed by Chen Liang

This commit is contained in:
Chen Liang 2020-01-29 11:15:07 -08:00
parent d4ecb65629
commit 5e1d89b74d
3 changed files with 111 additions and 1 deletions

View File

@ -81,6 +81,12 @@ public class ObserverReadProxyProvider<T>
/** Client-side context for syncing with the NameNode server side. */ /** Client-side context for syncing with the NameNode server side. */
private final AlignmentContext alignmentContext; private final AlignmentContext alignmentContext;
/** Configuration key for {@link #observerProbeRetryPeriodMs}. */
static final String OBSERVER_PROBE_RETRY_PERIOD_KEY =
"dfs.client.failover.observer.probe.retry.period";
/** Observer probe retry period default to 10 min. */
static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
/** The inner proxy provider used for active/standby failover. */ /** The inner proxy provider used for active/standby failover. */
private final AbstractNNFailoverProxyProvider<T> failoverProxy; private final AbstractNNFailoverProxyProvider<T> failoverProxy;
/** List of all NameNode proxies. */ /** List of all NameNode proxies. */
@ -139,6 +145,21 @@ public class ObserverReadProxyProvider<T>
/** The last proxy that has been used. Only used for testing. */ /** The last proxy that has been used. Only used for testing. */
private volatile ProxyInfo<T> lastProxy = null; private volatile ProxyInfo<T> lastProxy = null;
/**
* In case there is no Observer node, for every read call, client will try
* to loop through all Standby nodes and fail eventually. Since there is no
* guarantee on when Observer node will be enabled. This can be very
* inefficient.
* The following value specify the period on how often to retry all Standby.
*/
private long observerProbeRetryPeriodMs;
/**
* The previous time where zero observer were found. If there was observer,
* or it is initialization, this is set to 0.
*/
private long lastObserverProbeTime;
/** /**
* By default ObserverReadProxyProvider uses * By default ObserverReadProxyProvider uses
* {@link ConfiguredFailoverProxyProvider} for failover. * {@link ConfiguredFailoverProxyProvider} for failover.
@ -157,6 +178,7 @@ public class ObserverReadProxyProvider<T>
this.failoverProxy = failoverProxy; this.failoverProxy = failoverProxy;
this.alignmentContext = new ClientGSIContext(); this.alignmentContext = new ClientGSIContext();
factory.setAlignmentContext(alignmentContext); factory.setAlignmentContext(alignmentContext);
this.lastObserverProbeTime = 0;
// Don't bother configuring the number of retries and such on the retry // Don't bother configuring the number of retries and such on the retry
// policy since it is mainly only used for determining whether or not an // policy since it is mainly only used for determining whether or not an
@ -187,6 +209,9 @@ public class ObserverReadProxyProvider<T>
// The host of the URI is the nameservice ID // The host of the URI is the nameservice ID
AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(), AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
observerProbeRetryPeriodMs = conf.getTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY,
OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
// TODO : make this configurable or remove this variable // TODO : make this configurable or remove this variable
if (wrappedProxy instanceof ClientProtocol) { if (wrappedProxy instanceof ClientProtocol) {
@ -322,6 +347,27 @@ public class ObserverReadProxyProvider<T>
lastMsyncTimeMs = Time.monotonicNow(); lastMsyncTimeMs = Time.monotonicNow();
} }
/**
* Check if client need to find an Observer proxy.
* If current proxy is Active then we should stick to it and postpone probing
* for Observers for a period of time. When this time expires the client will
* try to find an Observer again.
* *
* @return true if we did not reach the threshold
* to start looking for Observer, or false otherwise.
*/
private boolean shouldFindObserver() {
// lastObserverProbeTime > 0 means we tried, but did not find any
// Observers yet
// If lastObserverProbeTime <= 0, previous check found observer, so
// we should not skip observer read.
if (lastObserverProbeTime > 0) {
return Time.monotonicNow() - lastObserverProbeTime
>= observerProbeRetryPeriodMs;
}
return true;
}
/** /**
* This will call {@link ClientProtocol#msync()} on the active NameNode * This will call {@link ClientProtocol#msync()} on the active NameNode
* (via the {@link #failoverProxy}) to update the state of this client, only * (via the {@link #failoverProxy}) to update the state of this client, only
@ -369,7 +415,7 @@ public class ObserverReadProxyProvider<T>
lastProxy = null; lastProxy = null;
Object retVal; Object retVal;
if (observerReadEnabled && isRead(method)) { if (observerReadEnabled && shouldFindObserver() && isRead(method)) {
if (!msynced) { if (!msynced) {
// An msync() must first be performed to ensure that this client is // An msync() must first be performed to ensure that this client is
// up-to-date with the active's state. This will only be done once. // up-to-date with the active's state. This will only be done once.
@ -446,11 +492,13 @@ public class ObserverReadProxyProvider<T>
+ "{} standby, {} active, and {} unreachable. Falling back " + "{} standby, {} active, and {} unreachable. Falling back "
+ "to active.", failedObserverCount, method.getName(), + "to active.", failedObserverCount, method.getName(),
standbyCount, activeCount, unreachableCount); standbyCount, activeCount, unreachableCount);
lastObserverProbeTime = 0;
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Read falling back to active without observer read " LOG.debug("Read falling back to active without observer read "
+ "fail, is there no observer node running?"); + "fail, is there no observer node running?");
} }
lastObserverProbeTime = Time.monotonicNow();
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -33,7 +34,9 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -53,6 +56,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -80,6 +84,10 @@ public class TestObserverNode {
public static void startUpCluster() throws Exception { public static void startUpCluster() throws Exception {
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
// Set observer probe retry period to 0. Required by the tests that restart
// Observer and immediately try to read from it.
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster = qjmhaCluster.getDfsCluster();
} }
@ -413,6 +421,54 @@ public class TestObserverNode {
assertSentTo(0); assertSentTo(0);
} }
/**
* Test that if client connects to Active it does not try to find Observer
* on next calls during some period of time.
*/
@Test
public void testStickyActive() throws Exception {
Path testFile = new Path(testPath, "testStickyActive");
Configuration newConf = new Configuration(conf);
// Observer probe retry period set to 5 sec
newConf.setLong(OBSERVER_PROBE_RETRY_PERIOD_KEY, 5000);
// Disable cache, so that a new client actually gets created with new conf.
newConf.setBoolean("fs.hdfs.impl.disable.cache", true);
DistributedFileSystem newFs = (DistributedFileSystem) FileSystem.get(newConf);
newFs.create(testFile, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
// No Observers present, should still go to Active
dfsCluster.transitionToStandby(2);
assertEquals("NN[2] should be standby", HAServiceState.STANDBY,
getServiceState(dfsCluster.getNameNode(2)));
newFs.open(testFile).close();
assertSentTo(0);
// Restore Observer
int newObserver = 1;
dfsCluster.transitionToObserver(newObserver);
assertEquals("NN[" + newObserver + "] should be observer",
HAServiceState.OBSERVER,
getServiceState(dfsCluster.getNameNode(newObserver)));
long startTime = Time.monotonicNow();
try {
while(Time.monotonicNow() - startTime <= 5000) {
newFs.open(testFile).close();
// Client should still talk to Active
assertSentTo(0);
Thread.sleep(200);
}
} catch(AssertionError ae) {
if(Time.monotonicNow() - startTime <= 5000) {
throw ae;
}
assertSentTo(newObserver);
} finally {
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToObserver(2);
}
}
private void assertSentTo(int nnIdx) throws IOException { private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx, assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
@ -43,6 +44,7 @@ import org.mockito.stubbing.Answer;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -72,6 +74,10 @@ public class TestObserverReadProxyProvider {
nnURI = URI.create("hdfs://" + ns); nnURI = URI.create("hdfs://" + ns);
conf = new Configuration(); conf = new Configuration();
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
// Set observer probe retry period to 0. Required by the tests that
// transition observer back and forth
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
} }
private void setupProxyProvider(int namenodeCount) throws Exception { private void setupProxyProvider(int namenodeCount) throws Exception {