diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 58f49438cc7..96722fcfab3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -94,9 +94,7 @@ public class ConfiguredFailoverProxyProvider extends proxies.add(new AddressRpcProxyPair(address)); } // Randomize the list to prevent all clients pointing to the same one - boolean randomized = conf.getBoolean( - HdfsClientConfigKeys.Failover.RANDOM_ORDER, - HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + boolean randomized = getRandomOrder(conf, uri); if (randomized) { Collections.shuffle(proxies); } @@ -111,6 +109,31 @@ public class ConfiguredFailoverProxyProvider extends } } + /** + * Check whether random order is configured for failover proxy provider + * for the namenode/nameservice. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode/nameservice + * @return random order configuration + */ + private static boolean getRandomOrder( + Configuration conf, URI nameNodeUri) { + String host = nameNodeUri.getHost(); + String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER + + "." + host; + + if (conf.get(configKeyWithHost) != null) { + return conf.getBoolean( + configKeyWithHost, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + } + + return conf.getBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + } + @Override public Class getInterface() { return xface; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConfiguredFailoverProxyProvider.java new file mode 100644 index 00000000000..d7a5db6b893 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConfiguredFailoverProxyProvider.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.event.Level; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test {@link ConfiguredFailoverProxyProvider}. + * This manages failover logic for a given set of nameservices/namenodes + * (aka proxies). + */ +public class TestConfiguredFailoverProxyProvider { + private Configuration conf; + private int rpcPort = 8020; + private URI ns1Uri; + private URI ns2Uri; + private String ns1; + private String ns1nn1Hostname = "machine1.foo.bar"; + private InetSocketAddress ns1nn1 = + new InetSocketAddress(ns1nn1Hostname, rpcPort); + private String ns1nn2Hostname = "machine2.foo.bar"; + private InetSocketAddress ns1nn2 = + new InetSocketAddress(ns1nn2Hostname, rpcPort); + private String ns2; + private String ns2nn1Hostname = "router1.foo.bar"; + private InetSocketAddress ns2nn1 = + new InetSocketAddress(ns2nn1Hostname, rpcPort); + private String ns2nn2Hostname = "router2.foo.bar"; + private InetSocketAddress ns2nn2 = + new InetSocketAddress(ns2nn2Hostname, rpcPort); + private String ns2nn3Hostname = "router3.foo.bar"; + private InetSocketAddress ns2nn3 = + new InetSocketAddress(ns2nn3Hostname, rpcPort); + private static final int NUM_ITERATIONS = 50; + + @BeforeClass + public static void setupClass() throws Exception { + GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE); + } + + @Before + public void setup() throws URISyntaxException { + ns1 = "mycluster-1-" + Time.monotonicNow(); + ns1Uri = new URI("hdfs://" + ns1); + conf = new Configuration(); + conf.set( + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns1, + "nn1,nn2,nn3"); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns1 + ".nn1", + ns1nn1Hostname + ":" + rpcPort); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns1 + ".nn2", + ns1nn2Hostname + ":" + rpcPort); + conf.set( + HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns1, + ConfiguredFailoverProxyProvider.class.getName()); + conf.setBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns1, + false); + + ns2 = "myroutercluster-2-" + Time.monotonicNow(); + ns2Uri = new URI("hdfs://" + ns2); + conf.set( + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns2, + "nn1,nn2,nn3"); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns2 + ".nn1", + ns2nn1Hostname + ":" + rpcPort); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns2 + ".nn2", + ns2nn2Hostname + ":" + rpcPort); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns2 + ".nn3", + ns2nn3Hostname + ":" + rpcPort); + conf.set( + HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns2, + ConfiguredFailoverProxyProvider.class.getName()); + conf.setBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns2, + true); + + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns1 + "," + ns2); + conf.set("fs.defaultFS", "hdfs://" + ns1); + } + + /** + * Tests getProxy with random.order configuration set to false. + * This expects the proxy order to be consistent every time a new + * ConfiguredFailoverProxyProvider is created. + */ + @Test + public void testNonRandomGetProxy() throws Exception { + final AtomicInteger nn1Count = new AtomicInteger(0); + final AtomicInteger nn2Count = new AtomicInteger(0); + + Map proxyMap = new HashMap<>(); + + final ClientProtocol nn1Mock = mock(ClientProtocol.class); + when(nn1Mock.getStats()).thenAnswer(createAnswer(nn1Count, 1)); + proxyMap.put(ns1nn1, nn1Mock); + + final ClientProtocol nn2Mock = mock(ClientProtocol.class); + when(nn2Mock.getStats()).thenAnswer(createAnswer(nn2Count, 2)); + proxyMap.put(ns1nn2, nn2Mock); + + ConfiguredFailoverProxyProvider provider1 = + new ConfiguredFailoverProxyProvider<>(conf, ns1Uri, + ClientProtocol.class, createFactory(proxyMap)); + ClientProtocol proxy1 = provider1.getProxy().proxy; + proxy1.getStats(); + assertEquals(1, nn1Count.get()); + assertEquals(0, nn2Count.get()); + proxy1.getStats(); + assertEquals(2, nn1Count.get()); + assertEquals(0, nn2Count.get()); + nn1Count.set(0); + nn2Count.set(0); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + ConfiguredFailoverProxyProvider provider2 = + new ConfiguredFailoverProxyProvider<>(conf, ns1Uri, + ClientProtocol.class, createFactory(proxyMap)); + ClientProtocol proxy2 = provider2.getProxy().proxy; + proxy2.getStats(); + } + assertEquals(NUM_ITERATIONS, nn1Count.get()); + assertEquals(0, nn2Count.get()); + } + + /** + * Tests getProxy with random.order configuration set to true. + * This expects the proxy order to be random every time a new + * ConfiguredFailoverProxyProvider is created. + */ + @Test + public void testRandomGetProxy() throws Exception { + final AtomicInteger nn1Count = new AtomicInteger(0); + final AtomicInteger nn2Count = new AtomicInteger(0); + final AtomicInteger nn3Count = new AtomicInteger(0); + + Map proxyMap = new HashMap<>(); + + final ClientProtocol nn1Mock = mock(ClientProtocol.class); + when(nn1Mock.getStats()).thenAnswer(createAnswer(nn1Count, 1)); + proxyMap.put(ns2nn1, nn1Mock); + + final ClientProtocol nn2Mock = mock(ClientProtocol.class); + when(nn2Mock.getStats()).thenAnswer(createAnswer(nn2Count, 2)); + proxyMap.put(ns2nn2, nn2Mock); + + final ClientProtocol nn3Mock = mock(ClientProtocol.class); + when(nn3Mock.getStats()).thenAnswer(createAnswer(nn3Count, 3)); + proxyMap.put(ns2nn3, nn3Mock); + + + for (int i = 0; i < NUM_ITERATIONS; i++) { + ConfiguredFailoverProxyProvider provider = + new ConfiguredFailoverProxyProvider<>(conf, ns2Uri, + ClientProtocol.class, createFactory(proxyMap)); + ClientProtocol proxy = provider.getProxy().proxy; + proxy.getStats(); + } + + assertTrue(nn1Count.get() < NUM_ITERATIONS && nn1Count.get() > 0); + assertTrue(nn2Count.get() < NUM_ITERATIONS && nn2Count.get() > 0); + assertTrue(nn3Count.get() < NUM_ITERATIONS && nn3Count.get() > 0); + assertEquals(NUM_ITERATIONS, + nn1Count.get() + nn2Count.get() + nn3Count.get()); + } + + /** + * createAnswer creates an Answer for using with the ClientProtocol mocks. + * @param counter counter to increment + * @param retVal return value from answer + * @return + */ + private Answer createAnswer(final AtomicInteger counter, + final long retVal) { + return new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + return new long[]{retVal}; + } + }; + } + + /** + * createFactory returns a HAProxyFactory for tests. + * This uses a map of name node address to ClientProtocol to route calls to + * different ClientProtocol objects. The tests could create ClientProtocol + * mocks and create name node mappings to use with + * ConfiguredFailoverProxyProvider. + */ + private HAProxyFactory createFactory( + final Map proxies) { + final Map proxyMap = proxies; + return new HAProxyFactory() { + @Override + public ClientProtocol createProxy(Configuration cfg, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + if (proxyMap.containsKey(nnAddr)) { + return proxyMap.get(nnAddr); + } else { + throw new IOException("Name node address not found"); + } + } + + @Override + public ClientProtocol createProxy(Configuration cfg, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries) throws IOException { + if (proxyMap.containsKey(nnAddr)) { + return proxyMap.get(nnAddr); + } else { + throw new IOException("Name node address not found"); + } + } + }; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c64b2f159fb..18b52ab5a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3631,6 +3631,18 @@ + + dfs.client.failover.random.order + false + + Determines if the failover proxies are picked in random order instead of the + configured order. The prefix can be used with an optional nameservice ID + (of form dfs.client.failover.random.order[.nameservice]) in case multiple + nameservices exist and random order should be enabled for specific + nameservices. + + + dfs.client.key.provider.cache.expiry 864000000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 47db565e37f..5cae2fcd772 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -41,6 +41,7 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { public void initializeMemberVariables() { xmlFilename = new String("hdfs-default.xml"); configurationClasses = new Class[] { HdfsClientConfigKeys.class, + HdfsClientConfigKeys.Failover.class, HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };