From 6630c9b75d65deefb5550e355eef7783909a57bc Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Wed, 17 Apr 2019 14:38:24 -0700 Subject: [PATCH] HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen. (cherry picked from 5847e0014343f60f853cb796781ca1fa03a72efd) --- .../ha/AbstractNNFailoverProxyProvider.java | 3 +- .../ha/ObserverReadProxyProvider.java | 54 +++++++++++++------ .../ha/TestDelegationTokensWithHA.java | 2 +- .../server/namenode/ha/TestObserverNode.java | 12 +++++ .../ha/TestObserverReadProxyProvider.java | 29 ++++++++++ 5 files changed, 83 insertions(+), 17 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 572cb1ccd37..646b100333f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -115,7 +115,8 @@ public abstract class AbstractNNFailoverProxyProvider implements /** * The currently known state of the NameNode represented by this ProxyInfo. * This may be out of date if the NameNode has changed state since the last - * time the state was checked. + * time the state was checked. If the NameNode could not be contacted, this + * will store null to indicate an unknown state. */ private HAServiceState cachedState; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 2ccf8856e58..ac4b1e7a7eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -66,7 +66,7 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ObserverReadProxyProvider +public class ObserverReadProxyProvider extends AbstractNNFailoverProxyProvider { @VisibleForTesting static final Logger LOG = LoggerFactory.getLogger( @@ -189,7 +189,13 @@ public class ObserverReadProxyProvider AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); // TODO : make this configurable or remove this variable - this.observerReadEnabled = true; + if (wrappedProxy instanceof ClientProtocol) { + this.observerReadEnabled = true; + } else { + LOG.info("Disabling observer reads for {} because the requested proxy " + + "class does not implement {}", uri, ClientProtocol.class.getName()); + this.observerReadEnabled = false; + } } public AlignmentContext getAlignmentContext() { @@ -267,7 +273,7 @@ public class ObserverReadProxyProvider private HAServiceState getHAServiceState(NNProxyInfo proxyInfo) { IOException ioe; try { - return proxyInfo.proxy.getHAServiceState(); + return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState(); } catch (RemoteException re) { // Though a Standby will allow a getHAServiceState call, it won't allow // delegation token lookup, so if DT is used it throws StandbyException @@ -284,7 +290,19 @@ public class ObserverReadProxyProvider LOG.debug("Failed to connect to {} while fetching HAServiceState", proxyInfo.getAddress(), ioe); } - return HAServiceState.STANDBY; + return null; + } + + /** + * Return the input proxy, cast as a {@link ClientProtocol}. This catches any + * {@link ClassCastException} and wraps it in a more helpful message. This + * should ONLY be called if the caller is certain that the proxy is, in fact, + * a {@link ClientProtocol}. + */ + private ClientProtocol getProxyAsClientProtocol(T proxy) { + assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy " + + "of class " + proxy.getClass() + " as if it was a ClientProtocol."; + return (ClientProtocol) proxy; } /** @@ -299,7 +317,7 @@ public class ObserverReadProxyProvider if (msynced) { return; // No need for an msync } - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); msynced = true; lastMsyncTimeMs = Time.monotonicNow(); } @@ -315,7 +333,7 @@ public class ObserverReadProxyProvider private void autoMsyncIfNecessary() throws IOException { if (autoMsyncPeriodMs == 0) { // Always msync - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); } else if (autoMsyncPeriodMs > 0) { if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { synchronized (this) { @@ -324,7 +342,7 @@ public class ObserverReadProxyProvider // Re-check the entry criterion since the status may have changed // while waiting for the lock. if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); lastMsyncTimeMs = Time.monotonicNow(); } } @@ -363,6 +381,7 @@ public class ObserverReadProxyProvider int failedObserverCount = 0; int activeCount = 0; int standbyCount = 0; + int unreachableCount = 0; for (int i = 0; i < nameNodeProxies.size(); i++) { NNProxyInfo current = getCurrentProxy(); HAServiceState currState = current.getCachedState(); @@ -371,9 +390,12 @@ public class ObserverReadProxyProvider activeCount++; } else if (currState == HAServiceState.STANDBY) { standbyCount++; + } else if (currState == null) { + unreachableCount++; } LOG.debug("Skipping proxy {} for {} because it is in state {}", - current.proxyInfo, method.getName(), currState); + current.proxyInfo, method.getName(), + currState == null ? "unreachable" : currState); changeProxy(current); continue; } @@ -420,10 +442,10 @@ public class ObserverReadProxyProvider // be that there is simply no Observer node running at all. if (failedObserverCount > 0) { // If we get here, it means all observers have failed. - LOG.warn("{} observers have failed for read request {}; " - + "also found {} standby, {} active. " - + "Falling back to active.", failedObserverCount, - method.getName(), standbyCount, activeCount); + LOG.warn("{} observers have failed for read request {}; also found " + + "{} standby, {} active, and {} unreachable. Falling back " + + "to active.", failedObserverCount, method.getName(), + standbyCount, activeCount, unreachableCount); } else { if (LOG.isDebugEnabled()) { LOG.debug("Read falling back to active without observer read " @@ -432,8 +454,9 @@ public class ObserverReadProxyProvider } } - // Either all observers have failed, or that it is a write request. - // In either case, we'll forward the request to active NameNode. + // Either all observers have failed, observer reads are disabled, + // or this is a write request. In any case, forward the request to + // the active NameNode. LOG.debug("Using failoverProxy to service {}", method.getName()); ProxyInfo activeProxy = failoverProxy.getProxy(); try { @@ -455,7 +478,8 @@ public class ObserverReadProxyProvider @Override public ConnectionId getConnectionId() { - return RPC.getConnectionIdForProxy(getCurrentProxy().proxy); + return RPC.getConnectionIdForProxy(observerReadEnabled + ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 718d13f124b..fb3cc34794d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -152,7 +152,7 @@ public class TestDelegationTokensWithHA { cluster.shutdownNameNode(0); logCapture.clearOutput(); dfs.access(new Path("/"), FsAction.READ); - assertTrue(logCapture.getOutput().contains("Assuming Standby state")); + assertTrue(logCapture.getOutput().contains("Failed to connect to")); } finally { logCapture.stopCapturing(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index b9234735700..f1eb5a8d07a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; +import org.apache.hadoop.hdfs.tools.GetGroups; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -115,6 +116,17 @@ public class TestObserverNode { fail("active cannot be transitioned to observer"); } + /** + * Test that non-ClientProtocol proxies such as + * {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work + * when run in an environment with observers. + */ + @Test + public void testGetGroups() throws Exception { + GetGroups getGroups = new GetGroups(conf); + assertEquals(0, getGroups.run(new String[0])); + } + @Test public void testNoObserverToActive() throws Exception { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index caf7d003eac..13b57744237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -42,9 +43,12 @@ import org.mockito.stubbing.Answer; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Tests for {@link ObserverReadProxyProvider} under various configurations of @@ -115,6 +119,31 @@ public class TestObserverReadProxyProvider { proxyProvider.setObserverReadEnabled(true); } + @Test + public void testWithNonClientProxy() throws Exception { + setupProxyProvider(2); // This will initialize all of the instance fields + final String fakeUser = "fakeUser"; + final String[] fakeGroups = {"fakeGroup"}; + HAProxyFactory proxyFactory = + new NameNodeHAProxyFactory() { + @Override + public GetUserMappingsProtocol createProxy(Configuration config, + InetSocketAddress addr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + GetUserMappingsProtocol proxy = + mock(GetUserMappingsProtocol.class); + when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups); + return proxy; + } + }; + ObserverReadProxyProvider userProxyProvider = + new ObserverReadProxyProvider<>(conf, nnURI, + GetUserMappingsProtocol.class, proxyFactory); + assertArrayEquals(fakeGroups, + userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser)); + } + @Test public void testReadOperationOnObserver() throws Exception { setupProxyProvider(3);