diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index ac4b1e7a7eb..66ba128ea58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -81,6 +81,12 @@ public class ObserverReadProxyProvider /** Client-side context for syncing with the NameNode server side. */ private final AlignmentContext alignmentContext; + /** Configuration key for {@link #observerProbeRetryPeriodMs}. */ + static final String OBSERVER_PROBE_RETRY_PERIOD_KEY = + "dfs.client.failover.observer.probe.retry.period"; + /** Observer probe retry period default to 10 min. */ + static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000; + /** The inner proxy provider used for active/standby failover. */ private final AbstractNNFailoverProxyProvider failoverProxy; /** List of all NameNode proxies. */ @@ -139,6 +145,21 @@ public class ObserverReadProxyProvider /** The last proxy that has been used. Only used for testing. */ private volatile ProxyInfo lastProxy = null; + /** + * In case there is no Observer node, for every read call, client will try + * to loop through all Standby nodes and fail eventually. Since there is no + * guarantee on when Observer node will be enabled. This can be very + * inefficient. + * The following value specify the period on how often to retry all Standby. + */ + private long observerProbeRetryPeriodMs; + + /** + * The previous time where zero observer were found. If there was observer, + * or it is initialization, this is set to 0. + */ + private long lastObserverProbeTime; + /** * By default ObserverReadProxyProvider uses * {@link ConfiguredFailoverProxyProvider} for failover. @@ -157,6 +178,7 @@ public class ObserverReadProxyProvider this.failoverProxy = failoverProxy; this.alignmentContext = new ClientGSIContext(); factory.setAlignmentContext(alignmentContext); + this.lastObserverProbeTime = 0; // Don't bother configuring the number of retries and such on the retry // policy since it is mainly only used for determining whether or not an @@ -187,6 +209,9 @@ public class ObserverReadProxyProvider // The host of the URI is the nameservice ID AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(), AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + observerProbeRetryPeriodMs = conf.getTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, + OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); // TODO : make this configurable or remove this variable if (wrappedProxy instanceof ClientProtocol) { @@ -322,6 +347,27 @@ public class ObserverReadProxyProvider lastMsyncTimeMs = Time.monotonicNow(); } + /** + * Check if client need to find an Observer proxy. + * If current proxy is Active then we should stick to it and postpone probing + * for Observers for a period of time. When this time expires the client will + * try to find an Observer again. + * * + * @return true if we did not reach the threshold + * to start looking for Observer, or false otherwise. + */ + private boolean shouldFindObserver() { + // lastObserverProbeTime > 0 means we tried, but did not find any + // Observers yet + // If lastObserverProbeTime <= 0, previous check found observer, so + // we should not skip observer read. + if (lastObserverProbeTime > 0) { + return Time.monotonicNow() - lastObserverProbeTime + >= observerProbeRetryPeriodMs; + } + return true; + } + /** * This will call {@link ClientProtocol#msync()} on the active NameNode * (via the {@link #failoverProxy}) to update the state of this client, only @@ -369,7 +415,7 @@ public class ObserverReadProxyProvider lastProxy = null; Object retVal; - if (observerReadEnabled && isRead(method)) { + if (observerReadEnabled && shouldFindObserver() && isRead(method)) { if (!msynced) { // An msync() must first be performed to ensure that this client is // up-to-date with the active's state. This will only be done once. @@ -446,11 +492,13 @@ public class ObserverReadProxyProvider + "{} standby, {} active, and {} unreachable. Falling back " + "to active.", failedObserverCount, method.getName(), standbyCount, activeCount, unreachableCount); + lastObserverProbeTime = 0; } else { if (LOG.isDebugEnabled()) { LOG.debug("Read falling back to active without observer read " + "fail, is there no observer node running?"); } + lastObserverProbeTime = Time.monotonicNow(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 2cf6dacfc5a..e2cc20a27b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,7 +34,9 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -53,6 +56,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -80,6 +84,10 @@ public class TestObserverNode { public static void startUpCluster() throws Exception { conf = new Configuration(); conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + // Set observer probe retry period to 0. Required by the tests that restart + // Observer and immediately try to read from it. + conf.setTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -413,6 +421,54 @@ public class TestObserverNode { assertSentTo(0); } + + /** + * Test that if client connects to Active it does not try to find Observer + * on next calls during some period of time. + */ + @Test + public void testStickyActive() throws Exception { + Path testFile = new Path(testPath, "testStickyActive"); + Configuration newConf = new Configuration(conf); + // Observer probe retry period set to 5 sec + newConf.setLong(OBSERVER_PROBE_RETRY_PERIOD_KEY, 5000); + // Disable cache, so that a new client actually gets created with new conf. + newConf.setBoolean("fs.hdfs.impl.disable.cache", true); + DistributedFileSystem newFs = (DistributedFileSystem) FileSystem.get(newConf); + newFs.create(testFile, (short)1).close(); + assertSentTo(0); + dfsCluster.rollEditLogAndTail(0); + // No Observers present, should still go to Active + dfsCluster.transitionToStandby(2); + assertEquals("NN[2] should be standby", HAServiceState.STANDBY, + getServiceState(dfsCluster.getNameNode(2))); + newFs.open(testFile).close(); + assertSentTo(0); + // Restore Observer + int newObserver = 1; + dfsCluster.transitionToObserver(newObserver); + assertEquals("NN[" + newObserver + "] should be observer", + HAServiceState.OBSERVER, + getServiceState(dfsCluster.getNameNode(newObserver))); + long startTime = Time.monotonicNow(); + try { + while(Time.monotonicNow() - startTime <= 5000) { + newFs.open(testFile).close(); + // Client should still talk to Active + assertSentTo(0); + Thread.sleep(200); + } + } catch(AssertionError ae) { + if(Time.monotonicNow() - startTime <= 5000) { + throw ae; + } + assertSentTo(newObserver); + } finally { + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToObserver(2); + } + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 13b57744237..e23bb24ef56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; @@ -43,6 +44,7 @@ import org.mockito.stubbing.Answer; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -72,6 +74,10 @@ public class TestObserverReadProxyProvider { nnURI = URI.create("hdfs://" + ns); conf = new Configuration(); conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); + // Set observer probe retry period to 0. Required by the tests that + // transition observer back and forth + conf.setTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); } private void setupProxyProvider(int namenodeCount) throws Exception {