diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 252b70dde44..32edb36f31e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -111,6 +112,12 @@ public abstract class AbstractNNFailoverProxyProvider implements */ public static class NNProxyInfo extends ProxyInfo { private InetSocketAddress address; + /** + * The currently known state of the NameNode represented by this ProxyInfo. + * This may be out of date if the NameNode has changed state since the last + * time the state was checked. + */ + private HAServiceState cachedState; public NNProxyInfo(InetSocketAddress address) { super(null, address.toString()); @@ -120,6 +127,15 @@ public abstract class AbstractNNFailoverProxyProvider implements public InetSocketAddress getAddress() { return address; } + + public void setCachedState(HAServiceState state) { + cachedState = state; + } + + public HAServiceState getCachedState() { + return cachedState; + } + } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index dcae2db32d4..e8192821aa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -20,18 +20,24 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -59,16 +65,18 @@ public class ObserverReadProxyProvider private static final Logger LOG = LoggerFactory.getLogger( ObserverReadProxyProvider.class); - /** Client-side context for syncing with the NameNode server side */ - private AlignmentContext alignmentContext; + /** Client-side context for syncing with the NameNode server side. */ + private final AlignmentContext alignmentContext; - private AbstractNNFailoverProxyProvider failoverProxy; - /** All NameNdoe proxies */ - private List> nameNodeProxies = - new ArrayList>(); - /** Proxies for the observer namenodes */ - private final List> observerProxies = - new ArrayList>(); + /** The inner proxy provider used for active/standby failover. */ + private final AbstractNNFailoverProxyProvider failoverProxy; + /** List of all NameNode proxies. */ + private final List> nameNodeProxies; + + /** The policy used to determine if an exception is fatal or retriable. */ + private final RetryPolicy observerRetryPolicy; + /** The combined proxy which redirects to other proxies as necessary. */ + private final ProxyInfo combinedProxy; /** * Whether reading from observer is enabled. If this is false, all read @@ -77,12 +85,19 @@ public class ObserverReadProxyProvider private boolean observerReadEnabled; /** - * Thread-local index to record the current index in the observer list. + * The index into the nameNodeProxies list currently being used. Should only + * be accessed in synchronized methods. */ - private static final ThreadLocal currentIndex = - ThreadLocal.withInitial(() -> 0); + private int currentIndex = -1; + /** + * The proxy being used currently; this will match with currentIndex above. + * This field is volatile to allow reads without synchronization; updates + * should still be performed synchronously to maintain consistency between + * currentIndex and this field. + */ + private volatile NNProxyInfo currentProxy; - /** The last proxy that has been used. Only used for testing */ + /** The last proxy that has been used. Only used for testing. */ private volatile ProxyInfo lastProxy = null; /** @@ -90,63 +105,53 @@ public class ObserverReadProxyProvider * {@link ConfiguredFailoverProxyProvider} for failover. */ public ObserverReadProxyProvider( - Configuration conf, URI uri, Class xface, HAProxyFactory factory) - throws IOException { + Configuration conf, URI uri, Class xface, HAProxyFactory factory) { this(conf, uri, xface, factory, - new ConfiguredFailoverProxyProvider(conf, uri, xface,factory)); + new ConfiguredFailoverProxyProvider<>(conf, uri, xface,factory)); } + @SuppressWarnings("unchecked") public ObserverReadProxyProvider( Configuration conf, URI uri, Class xface, HAProxyFactory factory, - AbstractNNFailoverProxyProvider failoverProxy) - throws IOException { + AbstractNNFailoverProxyProvider failoverProxy) { super(conf, uri, xface, factory); this.failoverProxy = failoverProxy; this.alignmentContext = new ClientGSIContext(); ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + // Don't bother configuring the number of retries and such on the retry + // policy since it is mainly only used for determining whether or not an + // exception is retriable or fatal + observerRetryPolicy = RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, 1); + // Get all NameNode proxies nameNodeProxies = getProxyAddresses(uri, HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); - // Find out all the observer proxies - for (NNProxyInfo pi : nameNodeProxies) { - createProxyIfNeeded(pi); - if (isObserverState(pi)) { - observerProxies.add(pi); - } - } - // TODO: No observers is not an error - // Just direct all reads go to the active NameNode - if (observerProxies.isEmpty()) { - throw new RuntimeException("Couldn't find any namenode proxy in " + - "OBSERVER state"); - } - } - - public synchronized AlignmentContext getAlignmentContext() { - return alignmentContext; - } - - @SuppressWarnings("unchecked") - @Override - public synchronized ProxyInfo getProxy() { - // We just create a wrapped proxy containing all the proxies + // Create a wrapped proxy containing all the proxies. Since this combined + // proxy is just redirecting to other proxies, all invocations can share it. StringBuilder combinedInfo = new StringBuilder("["); - - for (int i = 0; i < this.observerProxies.size(); i++) { + for (int i = 0; i < nameNodeProxies.size(); i++) { if (i > 0) { combinedInfo.append(","); } - combinedInfo.append(observerProxies.get(i).proxyInfo); + combinedInfo.append(nameNodeProxies.get(i).proxyInfo); } - combinedInfo.append(']'); T wrappedProxy = (T) Proxy.newProxyInstance( ObserverReadInvocationHandler.class.getClassLoader(), - new Class[]{xface}, - new ObserverReadInvocationHandler(observerProxies)); - return new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); + new Class[] { xface }, new ObserverReadInvocationHandler()); + combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); + } + + public AlignmentContext getAlignmentContext() { + return alignmentContext; + } + + @Override + public ProxyInfo getProxy() { + return combinedProxy; } @Override @@ -159,8 +164,11 @@ public class ObserverReadProxyProvider * * @return whether the 'method' is a read-only operation. */ - private boolean isRead(Method method) { - return method.isAnnotationPresent(ReadOnly.class); + private static boolean isRead(Method method) { + if (!method.isAnnotationPresent(ReadOnly.class)) { + return false; + } + return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); } @VisibleForTesting @@ -168,21 +176,13 @@ public class ObserverReadProxyProvider this.observerReadEnabled = flag; } - /** - * After getting exception 'ex', whether we should retry the current request - * on a different observer. - */ - private boolean shouldRetry(Exception ex) throws Exception { - // TODO: implement retry policy - return true; - } - @VisibleForTesting ProxyInfo getLastProxy() { return lastProxy; } - boolean isObserverState(NNProxyInfo pi) { + private static HAServiceState getServiceState( + NNProxyInfo pi) { // TODO: should introduce new ClientProtocol method to verify the // underlying service state, which does not require superuser access // The is a workaround @@ -190,7 +190,7 @@ public class ObserverReadProxyProvider try { // Verify write access first pi.proxy.reportBadBlocks(new LocatedBlock[0]); - return false; // Only active NameNode allows write + return HAServiceState.ACTIVE; // Only active NameNode allows write } catch (RemoteException re) { IOException sbe = re.unwrapRemoteException(StandbyException.class); if (!(sbe instanceof StandbyException)) { @@ -200,15 +200,16 @@ public class ObserverReadProxyProvider ioe = e; } if (ioe != null) { - LOG.error("Failed to connect to {}", pi.getAddress(), ioe); - return false; + LOG.warn("Failed to connect to {}", pi.getAddress(), ioe); + return HAServiceState.STANDBY; // Just assume standby in this case + // Anything besides observer is fine } // Verify read access // For now we assume only Observer nodes allow reads // Stale reads on StandbyNode should be turned off try { pi.proxy.checkAccess("/", FsAction.READ); - return true; + return HAServiceState.OBSERVER; } catch (RemoteException re) { IOException sbe = re.unwrapRemoteException(StandbyException.class); if (!(sbe instanceof StandbyException)) { @@ -218,29 +219,60 @@ public class ObserverReadProxyProvider ioe = e; } if (ioe != null) { - LOG.error("Failed to connect to {}", pi.getAddress(), ioe); + LOG.warn("Failed to connect to {}", pi.getAddress(), ioe); } - return false; + return HAServiceState.STANDBY; } - - class ObserverReadInvocationHandler implements InvocationHandler { - final List> observerProxies; - final ProxyInfo activeProxy; - - ObserverReadInvocationHandler(List> observerProxies) { - this.observerProxies = observerProxies; - this.activeProxy = failoverProxy.getProxy(); + /** + * Return the currently used proxy. If there is none, first calls + * {@link #changeProxy(NNProxyInfo)} to initialize one. + */ + private NNProxyInfo getCurrentProxy() { + if (currentProxy == null) { + changeProxy(null); } + return currentProxy; + } + + /** + * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by + * the caller does not match the current proxy, the call is ignored; this is + * to handle concurrent calls (to avoid changing the proxy multiple times). + * The service state of the newly selected proxy will be updated before + * returning. + * + * @param initial The expected current proxy + */ + private synchronized void changeProxy(NNProxyInfo initial) { + if (currentProxy != initial) { + // Must have been a concurrent modification; ignore the move request + return; + } + // Attempt to force concurrent callers of getCurrentProxy to wait for the + // new proxy; best-effort by setting currentProxy to null + currentProxy = null; + currentIndex = (currentIndex + 1) % nameNodeProxies.size(); + currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); + currentProxy.setCachedState(getServiceState(currentProxy)); + LOG.debug("Changed current proxy from {} to {}", + initial == null ? "none" : initial.proxyInfo, + currentProxy.proxyInfo); + } + + /** + * An InvocationHandler to handle incoming requests. This class's invoke + * method contains the primary logic for redirecting to observers. + * + * If observer reads are enabled, attempt to send read operations to the + * current proxy. If it is not an observer, or the observer fails, adjust + * the current proxy and retry on the next one. If all proxies are tried + * without success, the request is forwarded to the active. + * + * Write requests are always forwarded to the active. + */ + private class ObserverReadInvocationHandler implements InvocationHandler { - /** - * Sends read operations to the observer (if enabled) specified by the - * current index, and send write operations to the active. If a observer - * fails, we increment the index and retry the next one. If all observers - * fail, the request is forwarded to the active. - * - * Write requests are always forwarded to the active. - */ @Override public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable { @@ -248,33 +280,65 @@ public class ObserverReadProxyProvider Object retVal; if (observerReadEnabled && isRead(method)) { - // Loop through all the proxies, starting from the current index. - for (int i = 0; i < observerProxies.size(); i++) { - NNProxyInfo current = observerProxies.get(currentIndex.get()); + int failedObserverCount = 0; + int activeCount = 0; + int standbyCount = 0; + for (int i = 0; i < nameNodeProxies.size(); i++) { + NNProxyInfo current = getCurrentProxy(); + HAServiceState currState = current.getCachedState(); + if (currState != HAServiceState.OBSERVER) { + if (currState == HAServiceState.ACTIVE) { + activeCount++; + } else if (currState == HAServiceState.STANDBY) { + standbyCount++; + } + LOG.debug("Skipping proxy {} for {} because it is in state {}", + current.proxyInfo, method.getName(), currState); + changeProxy(current); + continue; + } + LOG.debug("Attempting to service {} using proxy {}", + method.getName(), current.proxyInfo); try { retVal = method.invoke(current.proxy, args); lastProxy = current; + LOG.debug("Invocation of {} using {} was successful", + method.getName(), current.proxyInfo); return retVal; - } catch (Exception e) { - if (!shouldRetry(e)) { - throw e; + } catch (InvocationTargetException ite) { + if (!(ite.getCause() instanceof Exception)) { + throw ite.getCause(); + } + Exception e = (Exception) ite.getCause(); + RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0, + method.isAnnotationPresent(Idempotent.class) + || method.isAnnotationPresent(AtMostOnce.class)); + if (retryInfo.action == RetryAction.RetryDecision.FAIL) { + throw e; + } else { + failedObserverCount++; + LOG.warn( + "Invocation returned exception on [{}]; {} failure(s) so far", + current.proxyInfo, failedObserverCount, e); + changeProxy(current); } - currentIndex.set((currentIndex.get() + 1) % observerProxies.size()); - LOG.warn("Invocation returned exception on [{}]", - current.proxyInfo, e.getCause()); } } // If we get here, it means all observers have failed. - LOG.warn("All observers have failed for read request {}. " + - "Fall back on active: {}", method.getName(), activeProxy); + LOG.warn("{} observers have failed for read request {}; also found " + + "{} standby and {} active. Falling back to active.", + failedObserverCount, standbyCount, activeCount, method.getName()); } // Either all observers have failed, or that it is a write request. // In either case, we'll forward the request to active NameNode. + LOG.debug("Using failoverProxy to service {}", method.getName()); + ProxyInfo activeProxy = failoverProxy.getProxy(); try { retVal = method.invoke(activeProxy.proxy, args); - } catch (Exception e) { + } catch (InvocationTargetException e) { + // This exception will be handled by higher layers throw e.getCause(); } lastProxy = activeProxy; @@ -284,7 +348,6 @@ public class ObserverReadProxyProvider @Override public synchronized void close() throws IOException { - failoverProxy.close(); for (ProxyInfo pi : nameNodeProxies) { if (pi.proxy != null) { if (pi.proxy instanceof Closeable) { @@ -292,8 +355,12 @@ public class ObserverReadProxyProvider } else { RPC.stopProxy(pi.proxy); } + // Set to null to avoid the failoverProxy having to re-do the close + // if it is sharing a proxy instance + pi.proxy = null; } } + failoverProxy.close(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index de34454e38d..16371b10308 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -27,22 +27,23 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.io.File; import java.io.IOException; import java.lang.reflect.Proxy; import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -200,6 +201,9 @@ public class TestObserverNode { // Start the observer again - requests should go to observer dfsCluster.restartNameNode(2); dfsCluster.transitionToObserver(2); + // The first request goes to the active because it has not refreshed yet; + // the second will properly go to the observer + dfs.getFileStatus(testPath); dfs.getFileStatus(testPath); assertSentTo(2); } @@ -231,6 +235,9 @@ public class TestObserverNode { dfsCluster.transitionToObserver(2); dfs.getFileStatus(testPath); + // The first request goes to the active because it has not refreshed yet; + // the second will properly go to the observer + dfs.getFileStatus(testPath); assertSentTo(2); } @@ -291,6 +298,10 @@ public class TestObserverNode { assertEquals(0, rc); } + // TODO this does not currently work because fetching the service state from + // e.g. the StandbyNameNode also waits for the transaction ID to catch up. + // This is disabled pending HDFS-13872 and HDFS-13749. + @Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed") @Test public void testMsyncSimple() throws Exception { // disable fast path here because this test's assertions are based on the @@ -304,7 +315,8 @@ public class TestObserverNode { setUpCluster(1); setObserverRead(true); - AtomicBoolean readSucceed = new AtomicBoolean(false); + // 0 == not completed, 1 == succeeded, -1 == failed + AtomicInteger readStatus = new AtomicInteger(0); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); @@ -313,20 +325,21 @@ public class TestObserverNode { try { // this read will block until roll and tail edits happen. dfs.getFileStatus(testPath); - readSucceed.set(true); + readStatus.set(1); } catch (IOException e) { e.printStackTrace(); + readStatus.set(-1); } }); reader.start(); // the reader is still blocking, not succeeded yet. - assertFalse(readSucceed.get()); + assertEquals(0, readStatus.get()); rollEditLogAndTail(0); // wait a while for all the change to be done - Thread.sleep(100); + GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); // the reader should have succeed. - assertTrue(readSucceed.get()); + assertEquals(1, readStatus.get()); } private void setUpCluster(int numObservers) throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java new file mode 100644 index 00000000000..4d5bc13d60e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import com.google.common.base.Joiner; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + + +/** + * Tests for {@link ObserverReadProxyProvider} under various configurations of + * NameNode states. Mainly testing that the proxy provider picks the correct + * NameNode to communicate with. + */ +public class TestObserverReadProxyProvider { + + private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0]; + private String ns; + private URI nnURI; + private Configuration conf; + + private ObserverReadProxyProvider proxyProvider; + private ClientProtocolAnswer[] namenodeAnswers; + private String[] namenodeAddrs; + + @Before + public void setup() throws Exception { + ns = "testcluster"; + nnURI = URI.create("hdfs://" + ns); + conf = new Configuration(); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); + } + + private void setupProxyProvider(int namenodeCount) throws Exception { + String[] namenodeIDs = new String[namenodeCount]; + namenodeAddrs = new String[namenodeCount]; + namenodeAnswers = new ClientProtocolAnswer[namenodeCount]; + ClientProtocol[] proxies = new ClientProtocol[namenodeCount]; + Map proxyMap = new HashMap<>(); + for (int i = 0; i < namenodeCount; i++) { + namenodeIDs[i] = "nn" + i; + namenodeAddrs[i] = "namenode" + i + ".test:8020"; + conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + + "." + namenodeIDs[i], namenodeAddrs[i]); + namenodeAnswers[i] = new ClientProtocolAnswer(); + proxies[i] = mock(ClientProtocol.class); + doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i])); + doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i])); + proxyMap.put(namenodeAddrs[i], proxies[i]); + } + conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + Joiner.on(",").join(namenodeIDs)); + proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI, + ClientProtocol.class, new ClientHAProxyFactory() { + @Override + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) { + return proxyMap.get(nnAddr.toString()); + } + }); + proxyProvider.setObserverReadEnabled(true); + } + + @Test + public void testReadOperationOnObserver() throws Exception { + setupProxyProvider(3); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[2].setObserverState(); + + doRead(); + assertHandledBy(2); + } + + @Test + public void testWriteOperationOnActive() throws Exception { + setupProxyProvider(3); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[2].setObserverState(); + + doWrite(); + assertHandledBy(0); + } + + @Test + public void testUnreachableObserverWithNoBackup() throws Exception { + setupProxyProvider(2); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + + namenodeAnswers[1].setUnreachable(true); + // Confirm that read still succeeds even though observer is not available + doRead(); + assertHandledBy(0); + } + + @Test + public void testUnreachableObserverWithMultiple() throws Exception { + setupProxyProvider(4); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[2].setObserverState(); + namenodeAnswers[3].setObserverState(); + + doRead(); + assertHandledBy(2); + + namenodeAnswers[2].setUnreachable(true); + doRead(); + // Fall back to the second observer node + assertHandledBy(3); + + namenodeAnswers[2].setUnreachable(false); + doRead(); + // Current index has changed, so although the first observer is back, + // it should continue requesting from the second observer + assertHandledBy(3); + + namenodeAnswers[3].setUnreachable(true); + doRead(); + // Now that second is unavailable, go back to using the first observer + assertHandledBy(2); + + namenodeAnswers[2].setUnreachable(true); + doRead(); + // Both observers are now unavailable, so it should fall back to active + assertHandledBy(0); + } + + @Test + public void testObserverToActive() throws Exception { + setupProxyProvider(3); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + namenodeAnswers[2].setObserverState(); + + doWrite(); + assertHandledBy(0); + + // Transition an observer to active + namenodeAnswers[0].setStandbyState(); + namenodeAnswers[1].setActiveState(); + try { + doWrite(); + fail("Write should fail; failover required"); + } catch (RemoteException re) { + assertEquals(re.getClassName(), + StandbyException.class.getCanonicalName()); + } + proxyProvider.performFailover(proxyProvider.getProxy().proxy); + doWrite(); + // After failover, previous observer is now active + assertHandledBy(1); + doRead(); + assertHandledBy(2); + + // Transition back to original state but second observer not available + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + namenodeAnswers[2].setUnreachable(true); + for (int i = 0; i < 2; i++) { + try { + doWrite(); + fail("Should have failed"); + } catch (IOException ioe) { + proxyProvider.performFailover(proxyProvider.getProxy().proxy); + } + } + doWrite(); + assertHandledBy(0); + + doRead(); + assertHandledBy(1); + } + + @Test + public void testObserverToStandby() throws Exception { + setupProxyProvider(3); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + namenodeAnswers[2].setObserverState(); + + doRead(); + assertHandledBy(1); + + namenodeAnswers[1].setStandbyState(); + doRead(); + assertHandledBy(2); + + namenodeAnswers[2].setStandbyState(); + doRead(); + assertHandledBy(0); + + namenodeAnswers[1].setObserverState(); + doRead(); + assertHandledBy(1); + } + + @Test + public void testSingleObserverToStandby() throws Exception { + setupProxyProvider(2); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + + doRead(); + assertHandledBy(1); + + namenodeAnswers[1].setStandbyState(); + doRead(); + assertHandledBy(0); + + namenodeAnswers[1].setObserverState(); + // The proxy provider still thinks the second NN is in observer state, + // so it will take a second call for it to notice the new observer + doRead(); + doRead(); + assertHandledBy(1); + } + + private void doRead() throws Exception { + doRead(proxyProvider.getProxy().proxy); + } + + private void doWrite() throws Exception { + doWrite(proxyProvider.getProxy().proxy); + } + + private void assertHandledBy(int namenodeIdx) { + assertEquals(namenodeAddrs[namenodeIdx], + proxyProvider.getLastProxy().proxyInfo); + } + + private static void doWrite(ClientProtocol client) throws Exception { + client.reportBadBlocks(EMPTY_BLOCKS); + } + + private static void doRead(ClientProtocol client) throws Exception { + client.checkAccess("/", FsAction.READ); + } + + /** + * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting + * the state or unreachability of this Answer will make the linked + * ClientProtocol respond as if it was communicating with a NameNode of + * the corresponding state. It is in Standby state by default. + */ + private static class ClientProtocolAnswer implements Answer { + + private volatile boolean unreachable = false; + // Standby state by default + private volatile boolean allowWrites = false; + private volatile boolean allowReads = false; + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + if (unreachable) { + throw new IOException("Unavailable"); + } + switch (invocationOnMock.getMethod().getName()) { + case "reportBadBlocks": + if (!allowWrites) { + throw new RemoteException(StandbyException.class.getCanonicalName(), + "No writes!"); + } + return null; + case "checkAccess": + if (!allowReads) { + throw new RemoteException(StandbyException.class.getCanonicalName(), + "No reads!"); + } + return null; + default: + throw new IllegalArgumentException( + "Only reportBadBlocks and checkAccess supported!"); + } + } + + void setUnreachable(boolean unreachable) { + this.unreachable = unreachable; + } + + void setActiveState() { + allowReads = true; + allowWrites = true; + } + + void setStandbyState() { + allowReads = false; + allowWrites = false; + } + + void setObserverState() { + allowReads = true; + allowWrites = false; + } + + } + +}