Revert "[SBN Read] Slow clients when Observer reads are enabled but there are no Observers on the cluster. Contributed by Chen Liang."
This reverts commit ff8ff0f7e5
.
This commit is contained in:
parent
ff8ff0f7e5
commit
3e86807802
|
@ -82,12 +82,6 @@ public class ObserverReadProxyProvider<T>
|
||||||
/** Client-side context for syncing with the NameNode server side. */
|
/** Client-side context for syncing with the NameNode server side. */
|
||||||
private final AlignmentContext alignmentContext;
|
private final AlignmentContext alignmentContext;
|
||||||
|
|
||||||
/** Configuration key for {@link #observerProbeRetryPeriodMs}. */
|
|
||||||
static final String OBSERVER_PROBE_RETRY_PERIOD_KEY =
|
|
||||||
"dfs.client.failover.observer.probe.retry.period";
|
|
||||||
/** Observer probe retry period default to 10 min. */
|
|
||||||
static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
|
|
||||||
|
|
||||||
/** The inner proxy provider used for active/standby failover. */
|
/** The inner proxy provider used for active/standby failover. */
|
||||||
private final AbstractNNFailoverProxyProvider<T> failoverProxy;
|
private final AbstractNNFailoverProxyProvider<T> failoverProxy;
|
||||||
/** List of all NameNode proxies. */
|
/** List of all NameNode proxies. */
|
||||||
|
@ -146,21 +140,6 @@ public class ObserverReadProxyProvider<T>
|
||||||
/** The last proxy that has been used. Only used for testing. */
|
/** The last proxy that has been used. Only used for testing. */
|
||||||
private volatile ProxyInfo<T> lastProxy = null;
|
private volatile ProxyInfo<T> lastProxy = null;
|
||||||
|
|
||||||
/**
|
|
||||||
* In case there is no Observer node, for every read call, client will try
|
|
||||||
* to loop through all Standby nodes and fail eventually. Since there is no
|
|
||||||
* guarantee on when Observer node will be enabled. This can be very
|
|
||||||
* inefficient.
|
|
||||||
* The following value specify the period on how often to retry all Standby.
|
|
||||||
*/
|
|
||||||
private long observerProbeRetryPeriodMs;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The previous time where zero observer were found. If there was observer,
|
|
||||||
* or it is initialization, this is set to 0.
|
|
||||||
*/
|
|
||||||
private long lastObserverProbeTime;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* By default ObserverReadProxyProvider uses
|
* By default ObserverReadProxyProvider uses
|
||||||
* {@link ConfiguredFailoverProxyProvider} for failover.
|
* {@link ConfiguredFailoverProxyProvider} for failover.
|
||||||
|
@ -179,7 +158,6 @@ public class ObserverReadProxyProvider<T>
|
||||||
this.failoverProxy = failoverProxy;
|
this.failoverProxy = failoverProxy;
|
||||||
this.alignmentContext = new ClientGSIContext();
|
this.alignmentContext = new ClientGSIContext();
|
||||||
factory.setAlignmentContext(alignmentContext);
|
factory.setAlignmentContext(alignmentContext);
|
||||||
this.lastObserverProbeTime = 0;
|
|
||||||
|
|
||||||
// Don't bother configuring the number of retries and such on the retry
|
// Don't bother configuring the number of retries and such on the retry
|
||||||
// policy since it is mainly only used for determining whether or not an
|
// policy since it is mainly only used for determining whether or not an
|
||||||
|
@ -210,9 +188,6 @@ public class ObserverReadProxyProvider<T>
|
||||||
// The host of the URI is the nameservice ID
|
// The host of the URI is the nameservice ID
|
||||||
AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
|
AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
|
||||||
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
|
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
observerProbeRetryPeriodMs = conf.getTimeDuration(
|
|
||||||
OBSERVER_PROBE_RETRY_PERIOD_KEY,
|
|
||||||
OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
// TODO : make this configurable or remove this variable
|
// TODO : make this configurable or remove this variable
|
||||||
if (wrappedProxy instanceof ClientProtocol) {
|
if (wrappedProxy instanceof ClientProtocol) {
|
||||||
|
@ -348,27 +323,6 @@ public class ObserverReadProxyProvider<T>
|
||||||
lastMsyncTimeMs = Time.monotonicNow();
|
lastMsyncTimeMs = Time.monotonicNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if client need to find an Observer proxy.
|
|
||||||
* If current proxy is Active then we should stick to it and postpone probing
|
|
||||||
* for Observers for a period of time. When this time expires the client will
|
|
||||||
* try to find an Observer again.
|
|
||||||
* *
|
|
||||||
* @return true if we did not reach the threshold
|
|
||||||
* to start looking for Observer, or false otherwise.
|
|
||||||
*/
|
|
||||||
private boolean shouldFindObserver() {
|
|
||||||
// lastObserverProbeTime > 0 means we tried, but did not find any
|
|
||||||
// Observers yet
|
|
||||||
// If lastObserverProbeTime <= 0, previous check found observer, so
|
|
||||||
// we should not skip observer read.
|
|
||||||
if (lastObserverProbeTime > 0) {
|
|
||||||
return Time.monotonicNow() - lastObserverProbeTime
|
|
||||||
>= observerProbeRetryPeriodMs;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This will call {@link ClientProtocol#msync()} on the active NameNode
|
* This will call {@link ClientProtocol#msync()} on the active NameNode
|
||||||
* (via the {@link #failoverProxy}) to update the state of this client, only
|
* (via the {@link #failoverProxy}) to update the state of this client, only
|
||||||
|
@ -416,7 +370,7 @@ public class ObserverReadProxyProvider<T>
|
||||||
lastProxy = null;
|
lastProxy = null;
|
||||||
Object retVal;
|
Object retVal;
|
||||||
|
|
||||||
if (observerReadEnabled && shouldFindObserver() && isRead(method)) {
|
if (observerReadEnabled && isRead(method)) {
|
||||||
if (!msynced) {
|
if (!msynced) {
|
||||||
// An msync() must first be performed to ensure that this client is
|
// An msync() must first be performed to ensure that this client is
|
||||||
// up-to-date with the active's state. This will only be done once.
|
// up-to-date with the active's state. This will only be done once.
|
||||||
|
@ -500,13 +454,11 @@ public class ObserverReadProxyProvider<T>
|
||||||
+ "also found {} standby, {} active, and {} unreachable. "
|
+ "also found {} standby, {} active, and {} unreachable. "
|
||||||
+ "Falling back to active.", failedObserverCount,
|
+ "Falling back to active.", failedObserverCount,
|
||||||
method.getName(), standbyCount, activeCount, unreachableCount);
|
method.getName(), standbyCount, activeCount, unreachableCount);
|
||||||
lastObserverProbeTime = 0;
|
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Read falling back to active without observer read "
|
LOG.debug("Read falling back to active without observer read "
|
||||||
+ "fail, is there no observer node running?");
|
+ "fail, is there no observer node running?");
|
||||||
}
|
}
|
||||||
lastObserverProbeTime = Time.monotonicNow();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
|
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -34,9 +33,7 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -56,7 +53,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
||||||
import org.apache.hadoop.hdfs.tools.GetGroups;
|
import org.apache.hadoop.hdfs.tools.GetGroups;
|
||||||
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -84,10 +80,6 @@ public class TestObserverNode {
|
||||||
public static void startUpCluster() throws Exception {
|
public static void startUpCluster() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
|
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
|
||||||
// Set observer probe retry period to 0. Required by the tests that restart
|
|
||||||
// Observer and immediately try to read from it.
|
|
||||||
conf.setTimeDuration(
|
|
||||||
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
|
|
||||||
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
|
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
|
||||||
dfsCluster = qjmhaCluster.getDfsCluster();
|
dfsCluster = qjmhaCluster.getDfsCluster();
|
||||||
}
|
}
|
||||||
|
@ -421,54 +413,6 @@ public class TestObserverNode {
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that if client connects to Active it does not try to find Observer
|
|
||||||
* on next calls during some period of time.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testStickyActive() throws Exception {
|
|
||||||
Path testFile = new Path(testPath, "testStickyActive");
|
|
||||||
Configuration newConf = new Configuration(conf);
|
|
||||||
// Observer probe retry period set to 5 sec
|
|
||||||
newConf.setLong(OBSERVER_PROBE_RETRY_PERIOD_KEY, 5000);
|
|
||||||
// Disable cache, so that a new client actually gets created with new conf.
|
|
||||||
newConf.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
||||||
DistributedFileSystem newFs = (DistributedFileSystem) FileSystem.get(newConf);
|
|
||||||
newFs.create(testFile, (short)1).close();
|
|
||||||
assertSentTo(0);
|
|
||||||
dfsCluster.rollEditLogAndTail(0);
|
|
||||||
// No Observers present, should still go to Active
|
|
||||||
dfsCluster.transitionToStandby(2);
|
|
||||||
assertEquals("NN[2] should be standby", HAServiceState.STANDBY,
|
|
||||||
getServiceState(dfsCluster.getNameNode(2)));
|
|
||||||
newFs.open(testFile).close();
|
|
||||||
assertSentTo(0);
|
|
||||||
// Restore Observer
|
|
||||||
int newObserver = 1;
|
|
||||||
dfsCluster.transitionToObserver(newObserver);
|
|
||||||
assertEquals("NN[" + newObserver + "] should be observer",
|
|
||||||
HAServiceState.OBSERVER,
|
|
||||||
getServiceState(dfsCluster.getNameNode(newObserver)));
|
|
||||||
long startTime = Time.monotonicNow();
|
|
||||||
try {
|
|
||||||
while(Time.monotonicNow() - startTime <= 5000) {
|
|
||||||
newFs.open(testFile).close();
|
|
||||||
// Client should still talk to Active
|
|
||||||
assertSentTo(0);
|
|
||||||
Thread.sleep(200);
|
|
||||||
}
|
|
||||||
} catch(AssertionError ae) {
|
|
||||||
if(Time.monotonicNow() - startTime <= 5000) {
|
|
||||||
throw ae;
|
|
||||||
}
|
|
||||||
assertSentTo(newObserver);
|
|
||||||
} finally {
|
|
||||||
dfsCluster.transitionToStandby(1);
|
|
||||||
dfsCluster.transitionToObserver(2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertSentTo(int nnIdx) throws IOException {
|
private void assertSentTo(int nnIdx) throws IOException {
|
||||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
@ -44,7 +43,6 @@ import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -74,10 +72,6 @@ public class TestObserverReadProxyProvider {
|
||||||
nnURI = URI.create("hdfs://" + ns);
|
nnURI = URI.create("hdfs://" + ns);
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
|
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
|
||||||
// Set observer probe retry period to 0. Required by the tests that
|
|
||||||
// transition observer back and forth
|
|
||||||
conf.setTimeDuration(
|
|
||||||
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupProxyProvider(int namenodeCount) throws Exception {
|
private void setupProxyProvider(int namenodeCount) throws Exception {
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
[INFO - 2020-01-28T23:13:43.723Z] GhostDriver - Main - running on port 35401
|
|
||||||
[INFO - 2020-01-28T23:13:44.162Z] Session [d450b8d0-4223-11ea-891f-8fa5006f053d] - page.settings - {"XSSAuditingEnabled":false,"javascriptCanCloseWindows":true,"javascriptCanOpenWindows":true,"javascriptEnabled":true,"loadImages":true,"localToRemoteUrlAccessEnabled":false,"userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X) AppleWebKit/538.1 (KHTML, like Gecko) PhantomJS/2.1.1 Safari/538.1","webSecurityEnabled":true}
|
|
||||||
[INFO - 2020-01-28T23:13:44.162Z] Session [d450b8d0-4223-11ea-891f-8fa5006f053d] - page.customHeaders: - {}
|
|
||||||
[INFO - 2020-01-28T23:13:44.162Z] Session [d450b8d0-4223-11ea-891f-8fa5006f053d] - Session.negotiatedCapabilities - {"browserName":"phantomjs","version":"2.1.1","driverName":"ghostdriver","driverVersion":"1.2.0","platform":"mac-unknown-64bit","javascriptEnabled":true,"takesScreenshot":true,"handlesAlerts":false,"databaseEnabled":false,"locationContextEnabled":false,"applicationCacheEnabled":false,"browserConnectionEnabled":false,"cssSelectorsEnabled":true,"webStorageEnabled":false,"rotatable":false,"acceptSslCerts":false,"nativeEvents":true,"proxy":{"proxyType":"direct"}}
|
|
||||||
[INFO - 2020-01-28T23:13:44.162Z] SessionManagerReqHand - _postNewSessionCommand - New Session Created: d450b8d0-4223-11ea-891f-8fa5006f053d
|
|
||||||
[INFO - 2020-01-28T23:13:44.990Z] ShutdownReqHand - _handle - About to shutdown
|
|
Loading…
Reference in New Issue