diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 9f01c390a6d..64824a15cd8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -448,4 +448,9 @@ public class RetryInvocationHandler implements RpcInvocationHandler { public ConnectionId getConnectionId() { return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy()); } + + @VisibleForTesting + public FailoverProxyProvider getProxyProvider() { + return proxyDescriptor.fpp; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 6f69eedf041..241ec059c21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.LongAccumulator; */ @InterfaceAudience.Private @InterfaceStability.Stable -class ClientGSIContext implements AlignmentContext { +public class ClientGSIContext implements AlignmentContext { private final LongAccumulator lastSeenStateId = new LongAccumulator(Math::max, Long.MIN_VALUE); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java new file mode 100644 index 00000000000..25035ab5af3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation + * that supports reading from observer namenode(s). + * + * This constructs a wrapper proxy that sends the request to observer + * namenode(s), if observer read is enabled. In case there are multiple + * observer namenodes, it will try them one by one in case the RPC failed. It + * will fail back to the active namenode after it has exhausted all the + * observer namenodes. + * + * Read and write requests will still be sent to active NN if reading from + * observer is turned off. + */ +public class ObserverReadProxyProvider + extends ConfiguredFailoverProxyProvider { + private static final Logger LOG = LoggerFactory.getLogger( + ObserverReadProxyProvider.class); + + /** Client-side context for syncing with the NameNode server side */ + private AlignmentContext alignmentContext; + + /** Proxies for the observer namenodes */ + private final List> observerProxies = + new ArrayList<>(); + + /** + * Whether reading from observer is enabled. If this is false, all read + * requests will still go to active NN. + */ + private boolean observerReadEnabled; + + /** + * Thread-local index to record the current index in the observer list. + */ + private static final ThreadLocal currentIndex = + ThreadLocal.withInitial(() -> 0); + + /** The last proxy that has been used. Only used for testing */ + private volatile ProxyInfo lastProxy = null; + + @SuppressWarnings("unchecked") + public ObserverReadProxyProvider( + Configuration conf, URI uri, Class xface, HAProxyFactory factory) + throws IOException { + super(conf, uri, xface, factory); + alignmentContext = new ClientGSIContext(); + ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + + // Find out all the observer proxies + for (AddressRpcProxyPair ap : this.proxies) { + ap.namenode = (T) NameNodeProxiesClient.createProxyWithAlignmentContext( + ap.address, conf, ugi, false, getFallbackToSimpleAuth(), + alignmentContext); + if (isObserverState(ap)) { + observerProxies.add(ap); + } + } + + if (observerProxies.isEmpty()) { + throw new RuntimeException("Couldn't find any namenode proxy in " + + "OBSERVER state"); + } + + // Randomize the list to prevent all clients pointing to the same one + boolean randomized = conf.getBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + if (randomized) { + Collections.shuffle(observerProxies); + } + } + + @Override + public synchronized AlignmentContext getAlignmentContext() { + return alignmentContext; + } + + @SuppressWarnings("unchecked") + @Override + public synchronized ProxyInfo getProxy() { + // We just create a wrapped proxy containing all the proxies + List> observerProxies = new ArrayList<>(); + StringBuilder combinedInfo = new StringBuilder("["); + + for (int i = 0; i < this.observerProxies.size(); i++) { + if (i > 0) { + combinedInfo.append(","); + } + AddressRpcProxyPair p = this.observerProxies.get(i); + ProxyInfo pInfo = getProxy(p); + observerProxies.add(pInfo); + combinedInfo.append(pInfo.proxyInfo); + } + + combinedInfo.append(']'); + T wrappedProxy = (T) Proxy.newProxyInstance( + ObserverReadInvocationHandler.class.getClassLoader(), + new Class[]{xface}, + new ObserverReadInvocationHandler(observerProxies)); + return new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); + } + + /** + * Check if a method is read-only. + * + * @return whether the 'method' is a read-only operation. + */ + private boolean isRead(Method method) { + return method.isAnnotationPresent(ReadOnly.class); + } + + @VisibleForTesting + void setObserverReadEnabled(boolean flag) { + this.observerReadEnabled = flag; + } + + /** + * After getting exception 'ex', whether we should retry the current request + * on a different observer. + */ + private boolean shouldRetry(Exception ex) throws Exception { + // TODO: implement retry policy + return true; + } + + @VisibleForTesting + ProxyInfo getLastProxy() { + return lastProxy; + } + + boolean isObserverState(AddressRpcProxyPair ap) { + // TODO: should introduce new ClientProtocol method to verify the + // underlying service state, which does not require superuser access + // The is a workaround + IOException ioe = null; + try { + // Verify write access first + ap.namenode.reportBadBlocks(new LocatedBlock[0]); + return false; // Only active NameNode allows write + } catch (RemoteException re) { + IOException sbe = re.unwrapRemoteException(StandbyException.class); + if (!(sbe instanceof StandbyException)) { + ioe = re; + } + } catch (IOException e) { + ioe = e; + } + if (ioe != null) { + LOG.error("Failed to connect to {}", ap.address, ioe); + return false; + } + // Verify read access + // For now we assume only Observer nodes allow reads + // Stale reads on StandbyNode should be turned off + try { + ap.namenode.checkAccess("/", FsAction.READ); + return true; + } catch (RemoteException re) { + IOException sbe = re.unwrapRemoteException(StandbyException.class); + if (!(sbe instanceof StandbyException)) { + ioe = re; + } + } catch (IOException e) { + ioe = e; + } + if (ioe != null) { + LOG.error("Failed to connect to {}", ap.address, ioe); + } + return false; + } + + + class ObserverReadInvocationHandler implements InvocationHandler { + final List> observerProxies; + final ProxyInfo activeProxy; + + ObserverReadInvocationHandler(List> observerProxies) { + this.observerProxies = observerProxies; + this.activeProxy = ObserverReadProxyProvider.super.getProxy(); + } + + /** + * Sends read operations to the observer (if enabled) specified by the + * current index, and send write operations to the active. If a observer + * fails, we increment the index and retry the next one. If all observers + * fail, the request is forwarded to the active. + * + * Write requests are always forwarded to the active. + */ + @Override + public Object invoke(Object proxy, final Method method, final Object[] args) + throws Throwable { + lastProxy = null; + Object retVal; + + if (observerReadEnabled && isRead(method)) { + // Loop through all the proxies, starting from the current index. + for (int i = 0; i < observerProxies.size(); i++) { + ProxyInfo current = observerProxies.get(currentIndex.get()); + try { + retVal = method.invoke(current.proxy, args); + lastProxy = current; + return retVal; + } catch (Exception e) { + if (!shouldRetry(e)) { + throw e; + } + currentIndex.set((currentIndex.get() + 1) % observerProxies.size()); + LOG.warn("Invocation returned exception on [{}]", + current.proxyInfo, e.getCause()); + } + } + + // If we get here, it means all observers have failed. + LOG.warn("All observers have failed for read request {}. " + + "Fall back on active: {}", method.getName(), activeProxy); + } + + // Either all observers have failed, or that it is a write request. + // In either case, we'll forward the request to active NameNode. + try { + retVal = method.invoke(activeProxy.proxy, args); + } catch (Exception e) { + throw e.getCause(); + } + lastProxy = activeProxy; + return retVal; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index cb335e17f87..d8bdc0d82c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2598,6 +2598,12 @@ public class MiniDFSCluster implements AutoCloseable { getNameNode(nnIndex).getRpcServer().transitionToStandby( new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED)); } + + public void transitionToObserver(int nnIndex) throws IOException, + ServiceFailedException { + getNameNode(nnIndex).getRpcServer().transitionToObserver( + new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED)); + } public void triggerBlockReports() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index 00ed6bcf1ac..4fcfd8c5df1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -90,7 +90,7 @@ public class TestStateAlignmentContextWithHA { // Create and set AlignmentContext in HAProxyFactory. // All proxies by factory will now have AlignmentContext assigned. this.alignmentContext = (spy != null ? spy : new ClientGSIContext()); - ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); AC_LIST.add(alignmentContext); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index bbe29cf1348..cc5b3d4d8bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -159,6 +159,18 @@ public abstract class HATestUtil { return (DistributedFileSystem)fs; } + public static DistributedFileSystem configureObserverReadFs( + MiniDFSCluster cluster, Configuration conf, + int nsIndex) throws IOException, URISyntaxException { + conf = new Configuration(conf); + String logicalName = getLogicalHostname(cluster); + setFailoverConfigurations(cluster, conf, logicalName, nsIndex); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + + logicalName, ObserverReadProxyProvider.class.getName()); + FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); + return (DistributedFileSystem) fs; + } + public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf) { setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java new file mode 100644 index 00000000000..98ffefd1050 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.net.URI; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +// Main unit tests for ObserverNode +public class TestObserverNode { + private Configuration conf; + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster dfsCluster; + private NameNode[] namenodes; + private Path testPath; + private Path testPath2; + private Path testPath3; + + /** These are set in each individual test case */ + private DistributedFileSystem dfs; + private ObserverReadProxyProvider provider; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + setUpCluster(1); + + testPath = new Path("/test"); + testPath2 = new Path("/test2"); + testPath3 = new Path("/test3"); + } + + @After + public void cleanUp() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test + public void testSimpleRead() throws Exception { + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + try { + dfs.getFileStatus(testPath); + fail("Should throw FileNotFoundException"); + } catch (FileNotFoundException e) { + // Pass + } + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(0); + } + + @Test + public void testFailover() throws Exception { + setObserverRead(false); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + dfs.getFileStatus(testPath); + assertSentTo(0); + + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(1); + dfsCluster.waitActive(); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(1); + dfs.getFileStatus(testPath); + assertSentTo(1); + } + + @Test + public void testDoubleFailover() throws Exception { + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(0); + + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(1); + dfsCluster.waitActive(1); + + rollEditLogAndTail(1); + dfs.getFileStatus(testPath2); + assertSentTo(2); + dfs.mkdir(testPath3, FsPermission.getDefault()); + assertSentTo(1); + + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath3); + assertSentTo(2); + dfs.delete(testPath3, false); + assertSentTo(0); + } + + @Test + public void testObserverFailover() throws Exception { + setUpCluster(2); + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentToAny(2, 3); + + // Transition observer #2 to standby, request should go to the #3. + dfsCluster.transitionToStandby(2); + dfs.getFileStatus(testPath); + assertSentTo(3); + + // Transition observer #3 to standby, request should go to active + dfsCluster.transitionToStandby(3); + dfs.getFileStatus(testPath); + assertSentTo(0); + + // Transition #2 back to observer, request should go to #2 + dfsCluster.transitionToObserver(2); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Transition #3 back to observer, request should go to either #2 or #3 + dfsCluster.transitionToObserver(3); + dfs.getFileStatus(testPath); + assertSentToAny(2, 3); + } + + @Test + public void testObserverShutdown() throws Exception { + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Shutdown the observer - requests should go to active + dfsCluster.shutdownNameNode(2); + dfs.getFileStatus(testPath); + assertSentTo(0); + + // Start the observer again - requests should go to observer + dfsCluster.restartNameNode(2); + dfsCluster.transitionToObserver(2); + dfs.getFileStatus(testPath); + assertSentTo(2); + } + + @Test + public void testObserverFailOverAndShutdown() throws Exception { + // Test the case when there is a failover before ONN shutdown + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(1); + dfsCluster.waitActive(); + + // Shutdown the observer - requests should go to active + dfsCluster.shutdownNameNode(2); + dfs.getFileStatus(testPath); + assertSentTo(1); + + // Start the observer again - requests should go to observer + dfsCluster.restartNameNode(2); + dfs.getFileStatus(testPath); + assertSentTo(1); + + dfsCluster.transitionToObserver(2); + dfs.getFileStatus(testPath); + assertSentTo(2); + } + + @Test + public void testMultiObserver() throws Exception { + setUpCluster(2); + setObserverRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentToAny(2, 3); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + rollEditLogAndTail(0); + + // Shutdown first observer, request should go to the second one + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath2); + assertSentTo(3); + + // Restart the first observer + dfsCluster.restartNameNode(2); + dfs.listStatus(testPath); + assertSentTo(3); + + dfsCluster.transitionToObserver(2); + dfs.listStatus(testPath); + assertSentToAny(2, 3); + + dfs.mkdir(testPath3, FsPermission.getDefault()); + rollEditLogAndTail(0); + + // Now shutdown the second observer, request should go to the first one + dfsCluster.shutdownNameNode(3); + dfs.listStatus(testPath3); + assertSentTo(2); + + // Shutdown both, request should go to active + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath3); + assertSentTo(0); + } + + @Test + public void testBootstrap() throws Exception { + for (URI u : dfsCluster.getNameDirs(2)) { + File dir = new File(u.getPath()); + assertTrue(FileUtil.fullyDelete(dir)); + } + int rc = BootstrapStandby.run( + new String[]{"-nonInteractive"}, + dfsCluster.getConfiguration(2) + ); + assertEquals(0, rc); + } + + private void setUpCluster(int numObservers) throws Exception { + qjmhaCluster = new MiniQJMHACluster.Builder(conf) + .setNumNameNodes(2 + numObservers) + .build(); + dfsCluster = qjmhaCluster.getDfsCluster(); + + namenodes = new NameNode[2 + numObservers]; + for (int i = 0; i < namenodes.length; i++) { + namenodes[i] = dfsCluster.getNameNode(i); + } + + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); + + for (int i = 0; i < numObservers; i++) { + dfsCluster.transitionToObserver(2 + i); + } + } + + private void assertSentTo(int nnIdx) { + assertSentToAny(nnIdx); + } + + private void assertSentToAny(int... nnIndices) { + FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy(); + for (int nnIdx : nnIndices) { + if (pi.proxyInfo.equals( + dfsCluster.getNameNode(nnIdx).getNameNodeAddress().toString())) { + return; + } + } + fail("Request was not sent to any of the expected namenodes"); + } + + private void setObserverRead(boolean flag) throws Exception { + dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, 0); + RetryInvocationHandler handler = + (RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode()); + provider = (ObserverReadProxyProvider) handler.getProxyProvider(); + provider.setObserverReadEnabled(flag); + } + + private void rollEditLogAndTail(int indexForActiveNN) throws Exception { + dfsCluster.getNameNode(indexForActiveNN).getRpcServer().rollEditLog(); + for (int i = 2; i < namenodes.length; i++) { + dfsCluster.getNameNode(i).getNamesystem().getEditLogTailer() + .doTailEdits(); + } + } +}