HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2018-12-20 17:49:22 -08:00
parent b66d5ae9e2
commit 64f28f9efa
9 changed files with 343 additions and 85 deletions

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A utility class used to combine two protocol proxies.
* See {@link #combine(Class, Object...)}.
*/
public final class ProxyCombiner {
private static final Logger LOG =
LoggerFactory.getLogger(ProxyCombiner.class);
private ProxyCombiner() { }
/**
* Combine two or more proxies which together comprise a single proxy
* interface. This can be used for a protocol interface which {@code extends}
* multiple other protocol interfaces. The returned proxy will implement
* all of the methods of the combined proxy interface, delegating calls
* to which proxy implements that method. If multiple proxies implement the
* same method, the first in the list will be used for delegation.
*
* <p/>This will check that every method on the combined interface is
* implemented by at least one of the supplied proxy objects.
*
* @param combinedProxyInterface The interface of the combined proxy.
* @param proxies The proxies which should be used as delegates.
* @param <T> The type of the proxy that will be returned.
* @return The combined proxy.
*/
@SuppressWarnings("unchecked")
public static <T> T combine(Class<T> combinedProxyInterface,
Object... proxies) {
methodLoop:
for (Method m : combinedProxyInterface.getMethods()) {
for (Object proxy : proxies) {
try {
proxy.getClass().getMethod(m.getName(), m.getParameterTypes());
continue methodLoop; // go to the next method
} catch (NoSuchMethodException nsme) {
// Continue to try the next proxy
}
}
throw new IllegalStateException("The proxies specified for "
+ combinedProxyInterface + " do not cover method " + m);
}
InvocationHandler handler = new CombinedProxyInvocationHandler(proxies);
return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
new Class[] {combinedProxyInterface}, handler);
}
private static final class CombinedProxyInvocationHandler
implements RpcInvocationHandler {
private final Object[] proxies;
private CombinedProxyInvocationHandler(Object[] proxies) {
this.proxies = proxies;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Exception lastException = null;
for (Object underlyingProxy : proxies) {
try {
return method.invoke(underlyingProxy, args);
} catch (IllegalAccessException|IllegalArgumentException e) {
lastException = e;
}
}
// This shouldn't happen since the method coverage was verified in build()
LOG.error("BUG: Method {} was unable to be found on any of the "
+ "underlying proxies for {}", method, proxy.getClass());
throw new IllegalArgumentException("Method " + method + " not supported",
lastException);
}
/**
* Since this is incapable of returning multiple connection IDs, simply
* return the first one. In most cases, the connection ID should be the same
* for all proxies.
*/
@Override
public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(proxies[0]);
}
@Override
public void close() throws IOException {
MultipleIOException.Builder exceptionBuilder =
new MultipleIOException.Builder();
for (Object proxy : proxies) {
if (proxy instanceof Closeable) {
try {
((Closeable) proxy).close();
} catch (IOException ioe) {
exceptionBuilder.add(ioe);
}
}
}
if (!exceptionBuilder.isEmpty()) {
throw exceptionBuilder.build();
}
}
}
}

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@ -41,4 +42,12 @@ T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException;
/**
* Set the alignment context to be used when creating new proxies using
* this factory. Not all implementations will use this alignment context.
*/
default void setAlignmentContext(AlignmentContext alignmentContext) {
// noop
}
}

View File

@ -120,7 +120,7 @@ public ObserverReadProxyProvider(
super(conf, uri, xface, factory);
this.failoverProxy = failoverProxy;
this.alignmentContext = new ClientGSIContext();
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
factory.setAlignmentContext(alignmentContext);
// Don't bother configuring the number of retries and such on the retry
// policy since it is mainly only used for determining whether or not an

View File

@ -40,13 +40,16 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProxyCombiner;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
@ -122,7 +125,7 @@ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
if (failoverProxyProvider == null) {
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
xface, UserGroupInformation.getCurrentUser(), true,
fallbackToSimpleAuth);
fallbackToSimpleAuth, null);
} else {
return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
failoverProxyProvider);
@ -145,7 +148,7 @@ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null);
}
/**
@ -167,29 +170,39 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth,
alignmentContext);
} else if (xface == JournalProtocol.class) {
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi,
alignmentContext);
} else if (xface == NamenodeProtocol.class) {
proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
withRetries);
withRetries, alignmentContext);
} else if (xface == GetUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi,
alignmentContext);
} else if (xface == RefreshUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf,
ugi, alignmentContext);
} else if (xface == RefreshAuthorizationPolicyProtocol.class) {
proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
conf, ugi);
conf, ugi, alignmentContext);
} else if (xface == RefreshCallQueueProtocol.class) {
proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi,
alignmentContext);
} else if (xface == InMemoryAliasMapProtocol.class) {
proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi);
proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi,
alignmentContext);
} else if (xface == BalancerProtocols.class) {
proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi,
withRetries, fallbackToSimpleAuth, alignmentContext);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to NameNode: " +
@ -202,58 +215,63 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
}
private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy(
address, conf, ugi, AliasMapProtocolPB.class, 30000);
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
AliasMapProtocolPB proxy = createNameNodeProxy(
address, conf, ugi, AliasMapProtocolPB.class, 30000, alignmentContext);
return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
}
private static JournalProtocol createNNProxyWithJournalProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
conf, ugi, JournalProtocolPB.class, 30000);
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
JournalProtocolPB proxy = createNameNodeProxy(address,
conf, ugi, JournalProtocolPB.class, 30000, alignmentContext);
return new JournalProtocolTranslatorPB(proxy);
}
private static RefreshAuthorizationPolicyProtocol
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0);
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0,
alignmentContext);
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
}
private static RefreshUserMappingsProtocol
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext);
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static RefreshCallQueueProtocol
createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0);
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
RefreshCallQueueProtocolPB.class, 0, alignmentContext);
return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
}
private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
GetUserMappingsProtocolPB.class, 0, alignmentContext);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class, 0);
boolean withRetries, AlignmentContext alignmentContext)
throws IOException {
NamenodeProtocolPB proxy = createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
@ -270,13 +288,28 @@ private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
}
}
private static Object createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class<?> xface,
int rpcTimeout) throws IOException {
private static BalancerProtocols createNNProxyWithBalancerProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext) throws IOException {
NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol(
address, conf, ugi, withRetries, alignmentContext);
ClientProtocol clientProtocol =
NameNodeProxiesClient.createProxyWithAlignmentContext(address,
conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext);
return ProxyCombiner.combine(BalancerProtocols.class,
namenodeProtocol, clientProtocol);
}
private static <T> T createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class<T> xface,
int rpcTimeout, AlignmentContext alignmentContext) throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
return proxy;
return RPC.getProtocolProxy(xface,
RPC.getProtocolVersion(xface), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null,
alignmentContext).getProxy();
}
}

View File

@ -43,11 +43,11 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -111,8 +111,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
private final URI nameNodeUri;
private final String blockpoolID;
private final NamenodeProtocol namenode;
private final ClientProtocol client;
private final BalancerProtocols namenode;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@ -136,9 +135,7 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
this.maxNotChangedIterations = maxNotChangedIterations;
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NamenodeProtocol.class).getProxy();
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, fallbackToSimpleAuth).getProxy();
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@ -194,7 +191,7 @@ public boolean isUpgrading() throws IOException {
/** @return live datanode storage reports. */
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
}
/** @return the key manager */

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@ -27,12 +28,14 @@
public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
private AlignmentContext alignmentContext;
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries, fallbackToSimpleAuth).getProxy();
ugi, withRetries, fallbackToSimpleAuth, alignmentContext).getProxy();
}
@Override
@ -42,4 +45,8 @@ public T createProxy(Configuration conf, InetSocketAddress nnAddr,
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries).getProxy();
}
public void setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.security.KerberosInfo;
/** The full set of protocols used by the Balancer. */
@InterfaceAudience.Private
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
public interface BalancerProtocols extends ClientProtocol, NamenodeProtocol { }

View File

@ -18,14 +18,13 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@ -33,7 +32,9 @@
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.junit.Test;
/**
@ -43,6 +44,13 @@ public class TestBalancerWithHANameNodes {
private MiniDFSCluster cluster;
ClientProtocol client;
// array of racks for original nodes in cluster
private static final String[] TEST_RACKS =
{TestBalancer.RACK0, TestBalancer.RACK1};
// array of capacities for original nodes in cluster
private static final long[] TEST_CAPACITIES =
{TestBalancer.CAPACITY, TestBalancer.CAPACITY};
static {
TestBalancer.initTestSetup();
}
@ -57,52 +65,79 @@ public class TestBalancerWithHANameNodes {
public void testBalancerWithHANameNodes() throws Exception {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
// array of racks for original nodes in cluster
String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
// array of capacities of original nodes in cluster
long[] capacities = new long[] { TestBalancer.CAPACITY,
TestBalancer.CAPACITY };
assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
Configuration copiedConf = new Configuration(conf);
cluster = new MiniDFSCluster.Builder(copiedConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(capacities.length)
.racks(racks)
.simulatedCapacities(capacities)
.numDataNodes(TEST_CAPACITIES.length)
.racks(TEST_RACKS)
.simulatedCapacities(TEST_CAPACITIES)
.build();
HATestUtil.setFailoverConfigurations(cluster, conf);
try {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(0);
Thread.sleep(500);
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
ClientProtocol.class).getProxy();
long totalCapacity = TestBalancer.sum(capacities);
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
/ numOfDatanodes, (short) numOfDatanodes, 1);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
new long[] { newNodeCapacity });
totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
doTest(conf);
} finally {
cluster.shutdown();
}
}
void doTest(Configuration conf) throws Exception {
int numOfDatanodes = TEST_CAPACITIES.length;
long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
/ numOfDatanodes, (short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
cluster.startDataNodes(conf, 1, true, null, new String[] {newNodeRack},
new long[] {newNodeCapacity});
totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
assertEquals(1, namenodes.size());
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
}
/**
* Test Balancer with ObserverNodes.
*/
@Test(timeout = 60000)
public void testBalancerWithObserver() throws Exception {
final Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
MiniQJMHACluster qjmhaCluster = null;
try {
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2,
TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS);
cluster = qjmhaCluster.getDfsCluster();
cluster.waitClusterUp();
cluster.waitActive();
DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
cluster, conf, ObserverReadProxyProvider.class, true);
client = dfs.getClient().getNamenode();
doTest(conf);
} finally {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
}
}

View File

@ -210,6 +210,14 @@ public static boolean isSentToAnyOfNameNodes(
public static MiniQJMHACluster setUpObserverCluster(
Configuration conf, int numObservers, int numDataNodes,
boolean fastTailing) throws IOException {
return setUpObserverCluster(conf, numObservers, numDataNodes,
fastTailing, null, null);
}
public static MiniQJMHACluster setUpObserverCluster(
Configuration conf, int numObservers, int numDataNodes,
boolean fastTailing, long[] simulatedCapacities,
String[] racks) throws IOException {
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
@ -225,7 +233,9 @@ public static MiniQJMHACluster setUpObserverCluster(
MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers);
qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes);
qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes)
.simulatedCapacities(simulatedCapacities)
.racks(racks);
MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();