HDFS-13291. RBF: Implement available space based OrderResolver. Contributed by Yiqun Lin.

This commit is contained in:
Yiqun Lin 2018-03-26 18:33:07 +08:00
parent 28790b81ec
commit cfc3a1c8f0
11 changed files with 634 additions and 135 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.EnumMap; import java.util.EnumMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver; import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver; import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
@ -77,6 +78,8 @@ public class MultipleDestinationMountTableResolver extends MountTableResolver {
addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router)); addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
addResolver(DestinationOrder.RANDOM, new RandomResolver()); addResolver(DestinationOrder.RANDOM, new RandomResolver());
addResolver(DestinationOrder.HASH_ALL, new HashResolver()); addResolver(DestinationOrder.HASH_ALL, new HashResolver());
addResolver(DestinationOrder.SPACE,
new AvailableSpaceResolver(conf, router));
} }
@Override @Override

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* Order the destinations based on available space. This resolver uses a
* higher probability (instead of "always") to choose the cluster with higher
* available space.
*/
public class AvailableSpaceResolver
extends RouterResolver<String, SubclusterAvailableSpace> {
private static final Logger LOG = LoggerFactory
.getLogger(AvailableSpaceResolver.class);
/** Increases chance of files on subcluster with more available space. */
public static final String BALANCER_PREFERENCE_KEY =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX
+ "available-space-resolver.balanced-space-preference-fraction";
public static final float BALANCER_PREFERENCE_DEFAULT = 0.6f;
/** Random instance used in the subcluster comparison. */
private static final Random RAND = new Random();
/** Customized comparator for SubclusterAvailableSpace. */
private SubclusterSpaceComparator comparator;
public AvailableSpaceResolver(final Configuration conf,
final Router routerService) {
super(conf, routerService);
float balancedPreference = conf.getFloat(BALANCER_PREFERENCE_KEY,
BALANCER_PREFERENCE_DEFAULT);
if (balancedPreference < 0.5) {
LOG.warn("The balancer preference value is less than 0.5. That means more"
+ " files will be allocated in cluster with lower available space.");
}
this.comparator = new SubclusterSpaceComparator(balancedPreference);
}
/**
* Get the mapping from NamespaceId to subcluster space info. It gets this
* mapping from the subclusters through expensive calls (e.g., RPC) and uses
* caching to avoid too many calls. The cache might be updated asynchronously
* to reduce latency.
*
* @return NamespaceId -> {@link SubclusterAvailableSpace}
*/
@Override
protected Map<String, SubclusterAvailableSpace> getSubclusterInfo(
MembershipStore membershipStore) {
Map<String, SubclusterAvailableSpace> mapping = new HashMap<>();
try {
// Get the Namenode's available space info from the subclusters
// from the Membership store.
GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest
.newInstance();
GetNamenodeRegistrationsResponse response = membershipStore
.getNamenodeRegistrations(request);
final List<MembershipState> nns = response.getNamenodeMemberships();
for (MembershipState nn : nns) {
try {
String nsId = nn.getNameserviceId();
long availableSpace = nn.getStats().getAvailableSpace();
mapping.put(nsId, new SubclusterAvailableSpace(nsId, availableSpace));
} catch (Exception e) {
LOG.error("Cannot get stats info for {}: {}.", nn, e.getMessage());
}
}
} catch (IOException ioe) {
LOG.error("Cannot get Namenodes from the State Store.", ioe);
}
return mapping;
}
@Override
protected String chooseFirstNamespace(String path, PathLocation loc) {
Map<String, SubclusterAvailableSpace> subclusterInfo =
getSubclusterMapping();
List<SubclusterAvailableSpace> subclusterList = new LinkedList<>(
subclusterInfo.values());
Collections.sort(subclusterList, comparator);
return subclusterList.size() > 0 ? subclusterList.get(0).getNameserviceId()
: null;
}
/**
* Inner class that stores cluster available space info.
*/
static class SubclusterAvailableSpace {
private final String nsId;
private final long availableSpace;
SubclusterAvailableSpace(String nsId, long availableSpace) {
this.nsId = nsId;
this.availableSpace = availableSpace;
}
public String getNameserviceId() {
return this.nsId;
}
public long getAvailableSpace() {
return this.availableSpace;
}
}
/**
* Customized comparator for SubclusterAvailableSpace. If more available
* space the one cluster has, the higher priority it will have. But this
* is not absolute, there is a balanced preference to make this use a higher
* probability (instead of "always") to compare by this way.
*/
static final class SubclusterSpaceComparator
implements Comparator<SubclusterAvailableSpace>, Serializable {
private int balancedPreference;
SubclusterSpaceComparator(float balancedPreference) {
Preconditions.checkArgument(
balancedPreference <= 1 && balancedPreference >= 0,
"The balancer preference value should be in the range 0.0 - 1.0");
this.balancedPreference = (int) (100 * balancedPreference);
}
@Override
public int compare(SubclusterAvailableSpace cluster1,
SubclusterAvailableSpace cluster2) {
int ret = cluster1.getAvailableSpace() > cluster2.getAvailableSpace() ? -1
: 1;
if (ret < 0) {
return (RAND.nextInt(100) < balancedPreference) ? -1 : 1;
} else {
return (RAND.nextInt(100) < balancedPreference) ? 1 : -1;
}
}
}
}

View File

@ -26,5 +26,6 @@ public enum DestinationOrder {
HASH, // Follow consistent hashing in the first folder level HASH, // Follow consistent hashing in the first folder level
LOCAL, // Local first LOCAL, // Local first
RANDOM, // Random order RANDOM, // Random order
HASH_ALL // Follow consistent hashing HASH_ALL, // Follow consistent hashing
SPACE // Available space based order
} }

View File

@ -17,11 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.federation.resolver.order; package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -30,17 +25,14 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
@ -50,40 +42,46 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
/** /**
* The local subcluster (where the writer is) should be tried first. The writer * The local subcluster (where the writer is) should be tried first. The writer
* is defined from the RPC query received in the RPC server. * is defined from the RPC query received in the RPC server.
*/ */
public class LocalResolver implements OrderedResolver { public class LocalResolver extends RouterResolver<String, String> {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(LocalResolver.class); LoggerFactory.getLogger(LocalResolver.class);
/** Configuration key to set the minimum time to update the local cache.*/
public static final String MIN_UPDATE_PERIOD_KEY =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
/** 10 seconds by default. */
private static final long MIN_UPDATE_PERIOD_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
/** Router service. */
private final Router router;
/** Minimum update time. */
private final long minUpdateTime;
/** Node IP -> Subcluster. */
private Map<String, String> nodeSubcluster = null;
/** Last time the subcluster map was updated. */
private long lastUpdated;
public LocalResolver(final Configuration conf, final Router routerService) { public LocalResolver(final Configuration conf, final Router routerService) {
this.minUpdateTime = conf.getTimeDuration( super(conf, routerService);
MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT, }
TimeUnit.MILLISECONDS);
this.router = routerService; /**
* Get the mapping from nodes to subcluster. It gets this mapping from the
* subclusters through expensive calls (e.g., RPC) and uses caching to avoid
* too many calls. The cache might be updated asynchronously to reduce
* latency.
*
* @return Node IP -> Subcluster.
*/
@Override
protected Map<String, String> getSubclusterInfo(
MembershipStore membershipStore) {
Map<String, String> mapping = new HashMap<>();
Map<String, String> dnSubcluster = getDatanodesSubcluster();
if (dnSubcluster != null) {
mapping.putAll(dnSubcluster);
}
Map<String, String> nnSubcluster = getNamenodesSubcluster(membershipStore);
if (nnSubcluster != null) {
mapping.putAll(nnSubcluster);
}
return mapping;
} }
/** /**
@ -98,12 +96,12 @@ public class LocalResolver implements OrderedResolver {
* @return Local name space. Null if we don't know about this machine. * @return Local name space. Null if we don't know about this machine.
*/ */
@Override @Override
public String getFirstNamespace(final String path, final PathLocation loc) { protected String chooseFirstNamespace(String path, PathLocation loc) {
String localSubcluster = null; String localSubcluster = null;
String clientAddr = getClientAddr(); String clientAddr = getClientAddr();
Map<String, String> nodeToSubcluster = getSubclusterMappings(); Map<String, String> subclusterInfo = getSubclusterMapping();
if (nodeToSubcluster != null) { if (subclusterInfo != null) {
localSubcluster = nodeToSubcluster.get(clientAddr); localSubcluster = subclusterInfo.get(clientAddr);
if (localSubcluster != null) { if (localSubcluster != null) {
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster); LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
} else { } else {
@ -121,52 +119,6 @@ public class LocalResolver implements OrderedResolver {
return Server.getRemoteAddress(); return Server.getRemoteAddress();
} }
/**
* Get the mapping from nodes to subcluster. It gets this mapping from the
* subclusters through expensive calls (e.g., RPC) and uses caching to avoid
* too many calls. The cache might be updated asynchronously to reduce
* latency.
*
* @return Node IP -> Subcluster.
*/
@VisibleForTesting
synchronized Map<String, String> getSubclusterMappings() {
if (nodeSubcluster == null ||
(monotonicNow() - lastUpdated) > minUpdateTime) {
// Fetch the mapping asynchronously
Thread updater = new Thread(new Runnable() {
@Override
public void run() {
Map<String, String> mapping = new HashMap<>();
Map<String, String> dnSubcluster = getDatanodesSubcluster();
if (dnSubcluster != null) {
mapping.putAll(dnSubcluster);
}
Map<String, String> nnSubcluster = getNamenodesSubcluster();
if (nnSubcluster != null) {
mapping.putAll(nnSubcluster);
}
nodeSubcluster = mapping;
lastUpdated = monotonicNow();
}
});
updater.start();
// Wait until initialized
if (nodeSubcluster == null) {
try {
LOG.debug("Wait to get the mapping for the first time");
updater.join();
} catch (InterruptedException e) {
LOG.error("Cannot wait for the updater to finish");
}
}
}
return nodeSubcluster;
}
/** /**
* Get the Datanode mapping from the subclusters from the Namenodes. This * Get the Datanode mapping from the subclusters from the Namenodes. This
* needs to be done as a privileged action to use the user for the Router and * needs to be done as a privileged action to use the user for the Router and
@ -221,14 +173,8 @@ public class LocalResolver implements OrderedResolver {
* *
* @return NN IP -> Subcluster. * @return NN IP -> Subcluster.
*/ */
private Map<String, String> getNamenodesSubcluster() { private Map<String, String> getNamenodesSubcluster(
MembershipStore membershipStore) {
final MembershipStore membershipStore = getMembershipStore();
if (membershipStore == null) {
LOG.error("Cannot access the Membership store");
return null;
}
// Manage requests from this hostname (127.0.0.1) // Manage requests from this hostname (127.0.0.1)
String localIp = "127.0.0.1"; String localIp = "127.0.0.1";
String localHostname = localIp; String localHostname = localIp;
@ -269,29 +215,4 @@ public class LocalResolver implements OrderedResolver {
} }
return ret; return ret;
} }
/**
* Get the Router RPC server.
*
* @return Router RPC server. Null if not possible.
*/
private RouterRpcServer getRpcServer() {
if (this.router == null) {
return null;
}
return router.getRpcServer();
}
/**
* Get the Membership store.
*
* @return Membership store.
*/
private MembershipStore getMembershipStore() {
StateStoreService stateStore = router.getStateStore();
if (stateStore == null) {
return null;
}
return stateStore.getRegisteredRecordStore(MembershipStore.class);
}
} }

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The order resolver that depends upon the Router service.
*
* @param <K> The key type of subcluster mapping info queried from Router.
* @param <V> The value type of subcluster mapping info queried from Router.
*/
public abstract class RouterResolver<K, V> implements OrderedResolver {
private static final Logger LOG =
LoggerFactory.getLogger(RouterResolver.class);
/** Configuration key to set the minimum time to update subcluster info. */
public static final String MIN_UPDATE_PERIOD_KEY =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "router-resolver.update-period";
/** 10 seconds by default. */
private static final long MIN_UPDATE_PERIOD_DEFAULT = TimeUnit.SECONDS
.toMillis(10);
/** Router service. */
private final Router router;
/** Minimum update time. */
private final long minUpdateTime;
/** K -> T template mapping. */
private Map<K, V> subclusterMapping = null;
/** Last time the subcluster mapping was updated. */
private long lastUpdated;
public RouterResolver(final Configuration conf, final Router routerService) {
this.minUpdateTime = conf.getTimeDuration(MIN_UPDATE_PERIOD_KEY,
MIN_UPDATE_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
this.router = routerService;
}
@Override
public String getFirstNamespace(String path, PathLocation loc) {
updateSubclusterMapping();
return chooseFirstNamespace(path, loc);
}
/**
* The implementation for getting desired subcluster mapping info.
*
* @param membershipStore Membership store the resolver queried from.
* @return The map of desired type info.
*/
protected abstract Map<K, V> getSubclusterInfo(
MembershipStore membershipStore);
/**
* Choose the first namespace from queried subcluster mapping info.
*
* @param path Path to check.
* @param loc Federated location with multiple destinations.
* @return First namespace out of the locations.
*/
protected abstract String chooseFirstNamespace(String path, PathLocation loc);
/**
* Update <NamespaceId, Subcluster Info> mapping info periodically.
*/
private synchronized void updateSubclusterMapping() {
if (subclusterMapping == null
|| (monotonicNow() - lastUpdated) > minUpdateTime) {
// Fetch the mapping asynchronously
Thread updater = new Thread(new Runnable() {
@Override
public void run() {
final MembershipStore membershipStore = getMembershipStore();
if (membershipStore == null) {
LOG.error("Cannot access the Membership store.");
return;
}
subclusterMapping = getSubclusterInfo(membershipStore);
lastUpdated = monotonicNow();
}
});
updater.start();
// Wait until initialized
if (subclusterMapping == null) {
try {
LOG.debug("Wait to get the mapping for the first time");
updater.join();
} catch (InterruptedException e) {
LOG.error("Cannot wait for the updater to finish");
}
}
}
}
/**
* Get the Router RPC server.
*
* @return Router RPC server. Null if not possible.
*/
protected RouterRpcServer getRpcServer() {
if (this.router == null) {
return null;
}
return router.getRpcServer();
}
/**
* Get the Membership store.
*
* @return Membership store.
*/
protected MembershipStore getMembershipStore() {
StateStoreService stateStore = router.getStateStore();
if (stateStore == null) {
return null;
}
return stateStore.getRegisteredRecordStore(MembershipStore.class);
}
/**
* Get subcluster mapping info.
*
* @return The map of subcluster info.
*/
protected Map<K, V> getSubclusterMapping() {
return this.subclusterMapping;
}
}

View File

@ -534,31 +534,20 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
} }
/** /**
* Get the permissions for the parent of a child with given permissions. If * Get the permissions for the parent of a child with given permissions.
* the child has r--, we will set it to r-x. * Add implicit u+wx permission for parent. This is based on
* @{FSDirMkdirOp#addImplicitUwx}.
* @param mask The permission mask of the child. * @param mask The permission mask of the child.
* @return The permission mask of the parent. * @return The permission mask of the parent.
*/ */
private static FsPermission getParentPermission(final FsPermission mask) { private static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission( FsPermission ret = new FsPermission(
applyExecute(mask.getUserAction()), mask.getUserAction().or(FsAction.WRITE_EXECUTE),
applyExecute(mask.getGroupAction()), mask.getGroupAction(),
applyExecute(mask.getOtherAction())); mask.getOtherAction());
return ret; return ret;
} }
/**
* Apply the execute permissions if it can be read.
* @param action Input permission.
* @return Output permission.
*/
private static FsAction applyExecute(final FsAction action) {
if (action.and(FsAction.READ) == FsAction.READ) {
return action.or(FsAction.EXECUTE);
}
return action;
}
/** /**
* Get the location to create a file. It checks if the file already existed * Get the location to create a file. It checks if the file already existed
* in one of the locations. * in one of the locations.

View File

@ -421,7 +421,8 @@ public abstract class MountTable extends BaseRecord {
public boolean isAll() { public boolean isAll() {
DestinationOrder order = getDestOrder(); DestinationOrder order = getDestOrder();
return order == DestinationOrder.HASH_ALL || return order == DestinationOrder.HASH_ALL ||
order == DestinationOrder.RANDOM; order == DestinationOrder.RANDOM ||
order == DestinationOrder.SPACE;
} }
/** /**

View File

@ -297,6 +297,8 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
return DestinationOrder.RANDOM; return DestinationOrder.RANDOM;
case HASH_ALL: case HASH_ALL:
return DestinationOrder.HASH_ALL; return DestinationOrder.HASH_ALL;
case SPACE:
return DestinationOrder.SPACE;
default: default:
return DestinationOrder.HASH; return DestinationOrder.HASH;
} }
@ -310,6 +312,8 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
return DestOrder.RANDOM; return DestOrder.RANDOM;
case HASH_ALL: case HASH_ALL:
return DestOrder.HASH_ALL; return DestOrder.HASH_ALL;
case SPACE:
return DestOrder.SPACE;
default: default:
return DestOrder.HASH; return DestOrder.HASH;
} }

View File

@ -130,6 +130,7 @@ message MountTableRecordProto {
LOCAL = 1; LOCAL = 1;
RANDOM = 2; RANDOM = 2;
HASH_ALL = 3; HASH_ALL = 3;
SPACE = 4;
} }
optional DestOrder destOrder = 6 [default = HASH]; optional DestOrder destOrder = 6 [default = HASH];

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterSpaceComparator;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MembershipStatsPBImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
/**
* Test the {@link AvailableSpaceResolver}.
*/
public class TestAvailableSpaceResolver {
private static final int SUBCLUSTER_NUM = 10;
@Test
public void testResolverWithNoPreference() throws IOException {
MultipleDestinationMountTableResolver mountTableResolver =
mockAvailableSpaceResolver(1.0f);
// Since we don't have any preference, it will
// always chose the maximum-available-space subcluster.
PathLocation loc = mountTableResolver.getDestinationForPath("/space");
assertEquals("subcluster9",
loc.getDestinations().get(0).getNameserviceId());
loc = mountTableResolver.getDestinationForPath("/space/subdir");
assertEquals("subcluster9",
loc.getDestinations().get(0).getNameserviceId());
}
@Test
public void testResolverWithDefaultPreference() throws IOException {
MultipleDestinationMountTableResolver mountTableResolver =
mockAvailableSpaceResolver(BALANCER_PREFERENCE_DEFAULT);
int retries = 10;
int retryTimes = 0;
// There is chance we won't always chose the
// maximum-available-space subcluster.
for (retryTimes = 0; retryTimes < retries; retryTimes++) {
PathLocation loc = mountTableResolver.getDestinationForPath("/space");
if (!"subcluster9"
.equals(loc.getDestinations().get(0).getNameserviceId())) {
break;
}
}
assertNotEquals(retries, retryTimes);
}
/**
* Mock the available space based resolver.
*
* @param balancerPreference The balancer preference for the resolver.
* @throws IOException
* @return MultipleDestinationMountTableResolver instance.
*/
@SuppressWarnings("unchecked")
private MultipleDestinationMountTableResolver mockAvailableSpaceResolver(
float balancerPreference) throws IOException {
Configuration conf = new Configuration();
conf.setFloat(BALANCER_PREFERENCE_KEY, balancerPreference);
Router router = mock(Router.class);
StateStoreService stateStore = mock(StateStoreService.class);
MembershipStore membership = mock(MembershipStore.class);
when(router.getStateStore()).thenReturn(stateStore);
when(stateStore.getRegisteredRecordStore(any(Class.class)))
.thenReturn(membership);
GetNamenodeRegistrationsResponse response =
GetNamenodeRegistrationsResponse.newInstance();
// Set the mapping for each client
List<MembershipState> records = new LinkedList<>();
for (int i = 0; i < SUBCLUSTER_NUM; i++) {
records.add(newMembershipState("subcluster" + i, i));
}
response.setNamenodeMemberships(records);
when(membership
.getNamenodeRegistrations(any(GetNamenodeRegistrationsRequest.class)))
.thenReturn(response);
// construct available space resolver
AvailableSpaceResolver resolver = new AvailableSpaceResolver(conf, router);
MultipleDestinationMountTableResolver mountTableResolver =
new MultipleDestinationMountTableResolver(conf, router);
mountTableResolver.addResolver(DestinationOrder.SPACE, resolver);
// We point /space to subclusters [0,..9] with the SPACE order
Map<String, String> destinations = new HashMap<>();
for (int i = 0; i < SUBCLUSTER_NUM; i++) {
destinations.put("subcluster" + i, "/space");
}
MountTable spaceEntry = MountTable.newInstance("/space", destinations);
spaceEntry.setDestOrder(DestinationOrder.SPACE);
mountTableResolver.addEntry(spaceEntry);
return mountTableResolver;
}
public static MembershipState newMembershipState(String nameservice,
long availableSpace) {
MembershipState record = MembershipState.newInstance();
record.setNameserviceId(nameservice);
MembershipStats stats = new MembershipStatsPBImpl();
stats.setAvailableSpace(availableSpace);
record.setStats(stats);
return record;
}
@Test
public void testSubclusterSpaceComparator() {
verifyRank(0.0f, true, false);
verifyRank(1.0f, true, true);
verifyRank(0.5f, false, false);
verifyRank(BALANCER_PREFERENCE_DEFAULT, false, false);
// test for illegal cases
try {
verifyRank(2.0f, false, false);
fail("Subcluster comparison should be failed.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"The balancer preference value should be in the range 0.0 - 1.0", e);
}
try {
verifyRank(-1.0f, false, false);
fail("Subcluster comparison should be failed.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"The balancer preference value should be in the range 0.0 - 1.0", e);
}
}
/**
* Verify result rank with {@link SubclusterSpaceComparator}.
* @param balancerPreference The balancer preference used
* in {@link SubclusterSpaceComparator}.
* @param shouldOrdered The result rank should be ordered.
* @param isDesc If the rank result is in a descending order.
*/
private void verifyRank(float balancerPreference, boolean shouldOrdered,
boolean isDesc) {
List<SubclusterAvailableSpace> subclusters = new LinkedList<>();
for (int i = 0; i < SUBCLUSTER_NUM; i++) {
subclusters.add(new SubclusterAvailableSpace("subcluster" + i, i));
}
// shuffle the cluster list if we expect rank to be ordered
if (shouldOrdered) {
Collections.shuffle(subclusters);
}
SubclusterSpaceComparator comparator = new SubclusterSpaceComparator(
balancerPreference);
Collections.sort(subclusters, comparator);
int i = SUBCLUSTER_NUM - 1;
for (; i >= 0; i--) {
SubclusterAvailableSpace cluster = subclusters
.get(SUBCLUSTER_NUM - 1 - i);
if (shouldOrdered) {
if (isDesc) {
assertEquals("subcluster" + i, cluster.getNameserviceId());
assertEquals(i, cluster.getAvailableSpace());
} else {
assertEquals("subcluster" + (SUBCLUSTER_NUM - 1 - i),
cluster.getNameserviceId());
assertEquals(SUBCLUSTER_NUM - 1 - i, cluster.getAvailableSpace());
}
} else {
// If catch one cluster is not in ordered, that's expected behavior.
if (!cluster.getNameserviceId().equals("subcluster" + i)
&& cluster.getAvailableSpace() != i) {
break;
}
}
}
// The var i won't reach to 0 since cluster list won't be completely
// ordered.
if (!shouldOrdered) {
assertNotEquals(0, i);
}
subclusters.clear();
}
}

View File

@ -60,8 +60,10 @@ public class TestRouterAllResolver {
/** Directory that will be in a HASH_ALL mount point. */ /** Directory that will be in a HASH_ALL mount point. */
private static final String TEST_DIR_HASH_ALL = "/hashall"; private static final String TEST_DIR_HASH_ALL = "/hashall";
/** Directory that will be in a HASH_ALL mount point. */ /** Directory that will be in a RANDOM mount point. */
private static final String TEST_DIR_RANDOM = "/random"; private static final String TEST_DIR_RANDOM = "/random";
/** Directory that will be in a SPACE mount point. */
private static final String TEST_DIR_SPACE = "/space";
/** Number of namespaces. */ /** Number of namespaces. */
private static final int NUM_NAMESPACES = 2; private static final int NUM_NAMESPACES = 2;
@ -103,6 +105,7 @@ public class TestRouterAllResolver {
// Setup the test mount point // Setup the test mount point
createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL); createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM); createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE);
// Get filesystems for federated and each namespace // Get filesystems for federated and each namespace
routerFs = routerContext.getFileSystem(); routerFs = routerContext.getFileSystem();
@ -135,6 +138,11 @@ public class TestRouterAllResolver {
testAll(TEST_DIR_RANDOM); testAll(TEST_DIR_RANDOM);
} }
@Test
public void testSpaceAll() throws Exception {
testAll(TEST_DIR_SPACE);
}
/** /**
* Tests that the resolver spreads files across subclusters in the whole * Tests that the resolver spreads files across subclusters in the whole
* tree. * tree.