From 69b0c513a9b11cc7f795747732173b36aacbe794 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Thu, 20 Dec 2018 17:49:22 -0800 Subject: [PATCH] HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen. (cherry picked from 64f28f9efa2ef3cd9dd54a6c5009029721e030ed) --- .../org/apache/hadoop/ipc/ProxyCombiner.java | 137 ++++++++++++++++++ .../server/namenode/ha/HAProxyFactory.java | 9 ++ .../ha/ObserverReadProxyProvider.java | 2 +- .../apache/hadoop/hdfs/NameNodeProxies.java | 117 +++++++++------ .../server/balancer/NameNodeConnector.java | 11 +- .../namenode/ha/NameNodeHAProxyFactory.java | 9 +- .../server/protocol/BalancerProtocols.java | 30 ++++ .../balancer/TestBalancerWithHANameNodes.java | 101 ++++++++----- .../hdfs/server/namenode/ha/HATestUtil.java | 12 +- 9 files changed, 343 insertions(+), 85 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java new file mode 100644 index 00000000000..fbafabcde4a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.ipc.Client.ConnectionId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A utility class used to combine two protocol proxies. + * See {@link #combine(Class, Object...)}. + */ +public final class ProxyCombiner { + + private static final Logger LOG = + LoggerFactory.getLogger(ProxyCombiner.class); + + private ProxyCombiner() { } + + /** + * Combine two or more proxies which together comprise a single proxy + * interface. This can be used for a protocol interface which {@code extends} + * multiple other protocol interfaces. The returned proxy will implement + * all of the methods of the combined proxy interface, delegating calls + * to which proxy implements that method. If multiple proxies implement the + * same method, the first in the list will be used for delegation. + * + *

This will check that every method on the combined interface is + * implemented by at least one of the supplied proxy objects. + * + * @param combinedProxyInterface The interface of the combined proxy. + * @param proxies The proxies which should be used as delegates. + * @param The type of the proxy that will be returned. + * @return The combined proxy. + */ + @SuppressWarnings("unchecked") + public static T combine(Class combinedProxyInterface, + Object... proxies) { + methodLoop: + for (Method m : combinedProxyInterface.getMethods()) { + for (Object proxy : proxies) { + try { + proxy.getClass().getMethod(m.getName(), m.getParameterTypes()); + continue methodLoop; // go to the next method + } catch (NoSuchMethodException nsme) { + // Continue to try the next proxy + } + } + throw new IllegalStateException("The proxies specified for " + + combinedProxyInterface + " do not cover method " + m); + } + + InvocationHandler handler = new CombinedProxyInvocationHandler(proxies); + return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(), + new Class[] {combinedProxyInterface}, handler); + } + + private static final class CombinedProxyInvocationHandler + implements RpcInvocationHandler { + + private final Object[] proxies; + + private CombinedProxyInvocationHandler(Object[] proxies) { + this.proxies = proxies; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + Exception lastException = null; + for (Object underlyingProxy : proxies) { + try { + return method.invoke(underlyingProxy, args); + } catch (IllegalAccessException|IllegalArgumentException e) { + lastException = e; + } + } + // This shouldn't happen since the method coverage was verified in build() + LOG.error("BUG: Method {} was unable to be found on any of the " + + "underlying proxies for {}", method, proxy.getClass()); + throw new IllegalArgumentException("Method " + method + " not supported", + lastException); + } + + /** + * Since this is incapable of returning multiple connection IDs, simply + * return the first one. In most cases, the connection ID should be the same + * for all proxies. + */ + @Override + public ConnectionId getConnectionId() { + return RPC.getConnectionIdForProxy(proxies[0]); + } + + @Override + public void close() throws IOException { + MultipleIOException.Builder exceptionBuilder = + new MultipleIOException.Builder(); + for (Object proxy : proxies) { + if (proxy instanceof Closeable) { + try { + ((Closeable) proxy).close(); + } catch (IOException ioe) { + exceptionBuilder.add(ioe); + } + } + } + if (!exceptionBuilder.isEmpty()) { + throw exceptionBuilder.build(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java index f92a74ff7ce..9364780800d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; @@ -41,4 +42,12 @@ public interface HAProxyFactory { T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries) throws IOException; + /** + * Set the alignment context to be used when creating new proxies using + * this factory. Not all implementations will use this alignment context. + */ + default void setAlignmentContext(AlignmentContext alignmentContext) { + // noop + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 3cccecfa6c4..2ccf8856e58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -156,7 +156,7 @@ public class ObserverReadProxyProvider super(conf, uri, xface, factory); this.failoverProxy = failoverProxy; this.alignmentContext = new ClientGSIContext(); - ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + factory.setAlignmentContext(alignmentContext); // Don't bother configuring the number of retries and such on the retry // policy since it is mainly only used for determining whether or not an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index bb555ef2592..3063083db88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -40,13 +40,16 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory; +import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProxyCombiner; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB; @@ -122,7 +125,7 @@ public class NameNodeProxies { if (failoverProxyProvider == null) { return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, - fallbackToSimpleAuth); + fallbackToSimpleAuth, null); } else { return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface, failoverProxyProvider); @@ -145,7 +148,7 @@ public class NameNodeProxies { public static ProxyAndInfo createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries) throws IOException { - return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null); + return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null); } /** @@ -167,29 +170,39 @@ public class NameNodeProxies { public static ProxyAndInfo createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException { + AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) + throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy; if (xface == ClientProtocol.class) { - proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( - nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth); + proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext( + nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth, + alignmentContext); } else if (xface == JournalProtocol.class) { - proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi); + proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi, + alignmentContext); } else if (xface == NamenodeProtocol.class) { proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi, - withRetries); + withRetries, alignmentContext); } else if (xface == GetUserMappingsProtocol.class) { - proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi); + proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi, + alignmentContext); } else if (xface == RefreshUserMappingsProtocol.class) { - proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi); + proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, + ugi, alignmentContext); } else if (xface == RefreshAuthorizationPolicyProtocol.class) { proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr, - conf, ugi); + conf, ugi, alignmentContext); } else if (xface == RefreshCallQueueProtocol.class) { - proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi); + proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi, + alignmentContext); } else if (xface == InMemoryAliasMapProtocol.class) { - proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi); + proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi, + alignmentContext); + } else if (xface == BalancerProtocols.class) { + proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi, + withRetries, fallbackToSimpleAuth, alignmentContext); } else { String message = "Unsupported protocol found when creating the proxy " + "connection to NameNode: " + @@ -202,58 +215,63 @@ public class NameNodeProxies { } private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol( - InetSocketAddress address, Configuration conf, UserGroupInformation ugi) - throws IOException { - AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy( - address, conf, ugi, AliasMapProtocolPB.class, 30000); + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + AliasMapProtocolPB proxy = createNameNodeProxy( + address, conf, ugi, AliasMapProtocolPB.class, 30000, alignmentContext); return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy); } private static JournalProtocol createNNProxyWithJournalProtocol( - InetSocketAddress address, Configuration conf, UserGroupInformation ugi) - throws IOException { - JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address, - conf, ugi, JournalProtocolPB.class, 30000); + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + JournalProtocolPB proxy = createNameNodeProxy(address, + conf, ugi, JournalProtocolPB.class, 30000, alignmentContext); return new JournalProtocolTranslatorPB(proxy); } private static RefreshAuthorizationPolicyProtocol createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, - Configuration conf, UserGroupInformation ugi) throws IOException { - RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB) - createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0); + Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address, + conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0, + alignmentContext); return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy); } private static RefreshUserMappingsProtocol createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, - Configuration conf, UserGroupInformation ugi) throws IOException { - RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB) - createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0); + Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, + ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext); return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy); } private static RefreshCallQueueProtocol createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, - Configuration conf, UserGroupInformation ugi) throws IOException { - RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB) - createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0); + Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi, + RefreshCallQueueProtocolPB.class, 0, alignmentContext); return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy); } private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol( - InetSocketAddress address, Configuration conf, UserGroupInformation ugi) - throws IOException { - GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB) - createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0); + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + AlignmentContext alignmentContext) throws IOException { + GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi, + GetUserMappingsProtocolPB.class, 0, alignmentContext); return new GetUserMappingsProtocolClientSideTranslatorPB(proxy); } private static NamenodeProtocol createNNProxyWithNamenodeProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, - boolean withRetries) throws IOException { - NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy( - address, conf, ugi, NamenodeProtocolPB.class, 0); + boolean withRetries, AlignmentContext alignmentContext) + throws IOException { + NamenodeProtocolPB proxy = createNameNodeProxy( + address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext); if (withRetries) { // create the proxy with retries RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200, TimeUnit.MILLISECONDS); @@ -270,13 +288,28 @@ public class NameNodeProxies { } } - private static Object createNameNodeProxy(InetSocketAddress address, - Configuration conf, UserGroupInformation ugi, Class xface, - int rpcTimeout) throws IOException { + private static BalancerProtocols createNNProxyWithBalancerProtocol( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + boolean withRetries, AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) throws IOException { + NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol( + address, conf, ugi, withRetries, alignmentContext); + ClientProtocol clientProtocol = + NameNodeProxiesClient.createProxyWithAlignmentContext(address, + conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext); + + return ProxyCombiner.combine(BalancerProtocols.class, + namenodeProtocol, clientProtocol); + } + + private static T createNameNodeProxy(InetSocketAddress address, + Configuration conf, UserGroupInformation ugi, Class xface, + int rpcTimeout, AlignmentContext alignmentContext) throws IOException { RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); - Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, - ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout); - return proxy; + return RPC.getProtocolProxy(xface, + RPC.getProtocolVersion(xface), address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null, + alignmentContext).getProxy(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 114167ca444..3be7530e767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -43,11 +43,11 @@ import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -111,8 +111,7 @@ public class NameNodeConnector implements Closeable { private final URI nameNodeUri; private final String blockpoolID; - private final NamenodeProtocol namenode; - private final ClientProtocol client; + private final BalancerProtocols namenode; private final KeyManager keyManager; final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); @@ -136,9 +135,7 @@ public class NameNodeConnector implements Closeable { this.maxNotChangedIterations = maxNotChangedIterations; this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, - NamenodeProtocol.class).getProxy(); - this.client = NameNodeProxies.createProxy(conf, nameNodeUri, - ClientProtocol.class, fallbackToSimpleAuth).getProxy(); + BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); @@ -194,7 +191,7 @@ public class NameNodeConnector implements Closeable { /** @return live datanode storage reports. */ public DatanodeStorageReport[] getLiveDatanodeStorageReport() throws IOException { - return client.getDatanodeStorageReport(DatanodeReportType.LIVE); + return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE); } /** @return the key manager */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java index 036b6eb367d..1aaaa388d38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; @@ -27,12 +28,14 @@ import java.util.concurrent.atomic.AtomicBoolean; public class NameNodeHAProxyFactory implements HAProxyFactory { + private AlignmentContext alignmentContext; + @Override public T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, - ugi, withRetries, fallbackToSimpleAuth).getProxy(); + ugi, withRetries, fallbackToSimpleAuth, alignmentContext).getProxy(); } @Override @@ -42,4 +45,8 @@ public class NameNodeHAProxyFactory implements HAProxyFactory { return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, ugi, withRetries).getProxy(); } + + public void setAlignmentContext(AlignmentContext alignmentContext) { + this.alignmentContext = alignmentContext; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java new file mode 100644 index 00000000000..d23f6cb5fd2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.security.KerberosInfo; + + +/** The full set of protocols used by the Balancer. */ +@InterfaceAudience.Private +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY) +public interface BalancerProtocols extends ClientProtocol, NamenodeProtocol { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 14441931fe9..4a398dbf923 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.net.URI; import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; @@ -33,7 +32,9 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.junit.Test; /** @@ -43,6 +44,13 @@ public class TestBalancerWithHANameNodes { private MiniDFSCluster cluster; ClientProtocol client; + // array of racks for original nodes in cluster + private static final String[] TEST_RACKS = + {TestBalancer.RACK0, TestBalancer.RACK1}; + // array of capacities for original nodes in cluster + private static final long[] TEST_CAPACITIES = + {TestBalancer.CAPACITY, TestBalancer.CAPACITY}; + static { TestBalancer.initTestSetup(); } @@ -57,52 +65,79 @@ public class TestBalancerWithHANameNodes { public void testBalancerWithHANameNodes() throws Exception { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); - long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity - String newNodeRack = TestBalancer.RACK2; // new node's rack - // array of racks for original nodes in cluster - String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 }; - // array of capacities of original nodes in cluster - long[] capacities = new long[] { TestBalancer.CAPACITY, - TestBalancer.CAPACITY }; - assertEquals(capacities.length, racks.length); - int numOfDatanodes = capacities.length; + assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length); NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); Configuration copiedConf = new Configuration(conf); cluster = new MiniDFSCluster.Builder(copiedConf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(capacities.length) - .racks(racks) - .simulatedCapacities(capacities) + .numDataNodes(TEST_CAPACITIES.length) + .racks(TEST_RACKS) + .simulatedCapacities(TEST_CAPACITIES) .build(); HATestUtil.setFailoverConfigurations(cluster, conf); try { cluster.waitActive(); - cluster.transitionToActive(1); + cluster.transitionToActive(0); Thread.sleep(500); client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class).getProxy(); - long totalCapacity = TestBalancer.sum(capacities); - // fill up the cluster to be 30% full - long totalUsedSpace = totalCapacity * 3 / 10; - TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace - / numOfDatanodes, (short) numOfDatanodes, 1); - // start up an empty node with the same capacity and on the same rack - cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, - new long[] { newNodeCapacity }); - totalCapacity += newNodeCapacity; - TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, - cluster); - Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); - assertEquals(1, namenodes.size()); - assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); - final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); - assertEquals(ExitStatus.SUCCESS.getExitCode(), r); - TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, - cluster, BalancerParameters.DEFAULT); + doTest(conf); } finally { cluster.shutdown(); } } + + void doTest(Configuration conf) throws Exception { + int numOfDatanodes = TEST_CAPACITIES.length; + long totalCapacity = TestBalancer.sum(TEST_CAPACITIES); + // fill up the cluster to be 30% full + long totalUsedSpace = totalCapacity * 3 / 10; + TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace + / numOfDatanodes, (short) numOfDatanodes, 0); + + // start up an empty node with the same capacity and on the same rack + long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity + String newNodeRack = TestBalancer.RACK2; // new node's rack + cluster.startDataNodes(conf, 1, true, null, new String[] {newNodeRack}, + new long[] {newNodeCapacity}); + totalCapacity += newNodeCapacity; + TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, + cluster); + Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + assertEquals(1, namenodes.size()); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); + TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster, BalancerParameters.DEFAULT); + } + + /** + * Test Balancer with ObserverNodes. + */ + @Test(timeout = 60000) + public void testBalancerWithObserver() throws Exception { + final Configuration conf = new HdfsConfiguration(); + TestBalancer.initConf(conf); + + MiniQJMHACluster qjmhaCluster = null; + try { + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2, + TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS); + cluster = qjmhaCluster.getDfsCluster(); + cluster.waitClusterUp(); + cluster.waitActive(); + + DistributedFileSystem dfs = HATestUtil.configureObserverReadFs( + cluster, conf, ObserverReadProxyProvider.class, true); + client = dfs.getClient().getNamenode(); + + doTest(conf); + } finally { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index ddbe248d193..261bf8cf6af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -210,6 +210,14 @@ public abstract class HATestUtil { public static MiniQJMHACluster setUpObserverCluster( Configuration conf, int numObservers, int numDataNodes, boolean fastTailing) throws IOException { + return setUpObserverCluster(conf, numObservers, numDataNodes, + fastTailing, null, null); + } + + public static MiniQJMHACluster setUpObserverCluster( + Configuration conf, int numObservers, int numDataNodes, + boolean fastTailing, long[] simulatedCapacities, + String[] racks) throws IOException { // disable block scanner conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); @@ -225,7 +233,9 @@ public abstract class HATestUtil { MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers); - qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes); + qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes) + .simulatedCapacities(simulatedCapacities) + .racks(racks); MiniQJMHACluster qjmhaCluster = qjmBuilder.build(); MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();