HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.

This commit is contained in:
Ayush Saxena 2019-03-30 07:15:41 +05:30 committed by Brahma Reddy Battula
parent 0dbd87874a
commit 6c42d40504
26 changed files with 1371 additions and 78 deletions

View File

@ -270,6 +270,7 @@ public class FederationMetrics implements FederationMBean {
innerInfo.put("order", "");
}
innerInfo.put("readonly", entry.isReadOnly());
innerInfo.put("faulttolerant", entry.isFaultTolerant());
info.add(Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import java.util.EnumSet;
/**
* Order of the destinations when we have multiple of them. When the resolver
* of files to subclusters (FileSubclusterResolver) has multiple destinations,
@ -27,5 +29,11 @@ public enum DestinationOrder {
LOCAL, // Local first
RANDOM, // Random order
HASH_ALL, // Follow consistent hashing
SPACE // Available space based order
SPACE; // Available space based order
/** Approaches that write folders in all subclusters. */
public static final EnumSet<DestinationOrder> FOLDER_ALL = EnumSet.of(
HASH_ALL,
RANDOM,
SPACE);
}

View File

@ -135,7 +135,17 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST =
FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing";
public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true;
public static final String DFS_ROUTER_CLIENT_MOUNT_TIME_OUT =
FEDERATION_ROUTER_PREFIX + "client.mount-status.time-out";
public static final long DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT =
TimeUnit.SECONDS.toMillis(1);
public static final String DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT =
FEDERATION_ROUTER_PREFIX + "connect.max.retries.on.timeouts";
public static final int DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT = 0;
public static final String DFS_ROUTER_CLIENT_CONNECT_TIMEOUT =
FEDERATION_ROUTER_PREFIX + "connect.timeout";
public static final long DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT =
TimeUnit.SECONDS.toMillis(2);
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =

View File

@ -210,7 +210,13 @@ public class RemoteMethod {
@Override
public String toString() {
return this.protocol.getSimpleName() + "#" + this.methodName + " " +
Arrays.toString(this.params);
return new StringBuilder()
.append(this.protocol.getSimpleName())
.append("#")
.append(this.methodName)
.append("(")
.append(Arrays.deepToString(this.params))
.append(")")
.toString();
}
}

View File

@ -68,4 +68,13 @@ public class RemoteParam {
return context.getDest();
}
}
@Override
public String toString() {
return new StringBuilder()
.append("RemoteParam(")
.append(this.paramMap)
.append(")")
.toString();
}
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
@ -93,6 +94,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -103,6 +106,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
/**
* Module that implements all the RPC calls in {@link ClientProtocol} in the
@ -119,6 +123,8 @@ public class RouterClientProtocol implements ClientProtocol {
/** If it requires response from all subclusters. */
private final boolean allowPartialList;
/** Time out when getting the mount statistics. */
private long mountStatusTimeOut;
/** Identifier for the super user. */
private String superUser;
@ -140,6 +146,10 @@ public class RouterClientProtocol implements ClientProtocol {
this.allowPartialList = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST,
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT);
this.mountStatusTimeOut = conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT,
RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT,
TimeUnit.SECONDS);
// User and group for reporting
try {
@ -234,15 +244,92 @@ public class RouterClientProtocol implements ClientProtocol {
}
}
RemoteLocation createLocation = rpcServer.getCreateLocation(src);
RemoteMethod method = new RemoteMethod("create",
new Class<?>[] {String.class, FsPermission.class, String.class,
EnumSetWritable.class, boolean.class, short.class,
long.class, CryptoProtocolVersion[].class,
String.class, String.class},
createLocation.getDest(), masked, clientName, flag, createParent,
new RemoteParam(), masked, clientName, flag, createParent,
replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
RemoteLocation createLocation = null;
try {
createLocation = rpcServer.getCreateLocation(src);
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
} catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, createLocation, locations);
return rpcClient.invokeSequential(
newLocations, method, HdfsFileStatus.class, null);
}
}
/**
* Check if an exception is caused by an unavailable subcluster or not. It
* also checks the causes.
* @param ioe IOException to check.
* @return If caused by an unavailable subcluster. False if the should not be
* retried (e.g., NSQuotaExceededException).
*/
private static boolean isUnavailableSubclusterException(
final IOException ioe) {
if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException ||
ioe instanceof NoNamenodesAvailableException) {
return true;
}
if (ioe.getCause() instanceof IOException) {
IOException cause = (IOException)ioe.getCause();
return isUnavailableSubclusterException(cause);
}
return false;
}
/**
* Check if a remote method can be retried in other subclusters when it
* failed in the original destination. This method returns the list of
* locations to retry in. This is used by fault tolerant mount points.
* @param method Method that failed and might be retried.
* @param src Path where the method was invoked.
* @param e Exception that was triggered.
* @param excludeLoc Location that failed and should be excluded.
* @param locations All the locations to retry.
* @return The locations where we should retry (excluding the failed ones).
* @throws IOException If this path is not fault tolerant or the exception
* should not be retried (e.g., NSQuotaExceededException).
*/
private List<RemoteLocation> checkFaultTolerantRetry(
final RemoteMethod method, final String src, final IOException ioe,
final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
throws IOException {
if (!isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
if (!rpcServer.isPathFaultTolerant(src)) {
LOG.debug("{} does not allow retrying a failed subcluster", src);
throw ioe;
}
final List<RemoteLocation> newLocations;
if (excludeLoc == null) {
LOG.error("Cannot invoke {} for {}: {}", method, src, ioe.getMessage());
newLocations = locations;
} else {
LOG.error("Cannot invoke {} for {} in {}: {}",
method, src, excludeLoc, ioe.getMessage());
newLocations = new ArrayList<>();
for (final RemoteLocation loc : locations) {
if (!loc.equals(excludeLoc)) {
newLocations.add(loc);
}
}
}
LOG.info("{} allows retrying failed subclusters in {}", src, newLocations);
return newLocations;
}
@Override
@ -604,13 +691,20 @@ public class RouterClientProtocol implements ClientProtocol {
}
} catch (IOException ioe) {
// Can't query if this file exists or not.
LOG.error("Error requesting file info for path {} while proxing mkdirs",
src, ioe);
LOG.error("Error getting file info for {} while proxying mkdirs: {}",
src, ioe.getMessage());
}
}
RemoteLocation firstLocation = locations.get(0);
final RemoteLocation firstLocation = locations.get(0);
try {
return (boolean) rpcClient.invokeSingle(firstLocation, method);
} catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, firstLocation, locations);
return rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}
}
@Override
@ -1702,10 +1796,26 @@ public class RouterClientProtocol implements ClientProtocol {
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
return getFileInfoAll(locations, method, -1);
}
/**
* Get the file info from all the locations.
*
* @param locations Locations to check.
* @param method The file information method to run.
* @param timeOutMs Time out for the operation in milliseconds.
* @return The first file info if it's a file, the directory if it's
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {
// Get the file info from everybody
Map<RemoteLocation, HdfsFileStatus> results =
rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
HdfsFileStatus.class);
int children = 0;
// We return the first file
HdfsFileStatus dirStatus = null;
@ -1762,9 +1872,10 @@ public class RouterClientProtocol implements ClientProtocol {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
MountTable entry = mountTable.getMountPoint(mName);
if (entry != null) {
HdfsFileStatus fInfo = getFileInfoAll(entry.getDestinations(),
new RemoteMethod("getFileInfo", new Class<?>[] {String.class},
new RemoteParam()));
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
HdfsFileStatus fInfo = getFileInfoAll(
entry.getDestinations(), method, mountStatusTimeOut);
if (fInfo != null) {
permission = fInfo.getPermission();
owner = fInfo.getOwner();

View File

@ -18,11 +18,15 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@ -62,6 +66,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,7 +131,8 @@ public class RouterRpcClient {
this.namenodeResolver = resolver;
this.connectionManager = new ConnectionManager(conf);
Configuration clientConf = getClientConfiguration(conf);
this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start();
int numThreads = conf.getInt(
@ -165,6 +171,31 @@ public class RouterRpcClient {
failoverSleepBaseMillis, failoverSleepMaxMillis);
}
/**
* Get the configuration for the RPC client. It takes the Router
* configuration and transforms it into regular RPC Client configuration.
* @param conf Input configuration.
* @return Configuration for the RPC client.
*/
private Configuration getClientConfiguration(final Configuration conf) {
Configuration clientConf = new Configuration(conf);
int maxRetries = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT,
RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT);
if (maxRetries >= 0) {
clientConf.setInt(
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries);
}
long connectTimeOut = conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (connectTimeOut >= 0) {
clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut);
}
return clientConf;
}
/**
* Get the active namenode resolver used by this client.
* @return Active namenode resolver.
@ -341,17 +372,19 @@ public class RouterRpcClient {
* @param method Remote ClientProtcol method to invoke.
* @param params Variable list of parameters matching the method.
* @return The result of invoking the method.
* @throws IOException
* @throws ConnectException If it cannot connect to any Namenode.
* @throws StandbyException If all Namenodes are in Standby.
* @throws IOException If it cannot invoke the method.
*/
private Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Class<?> protocol, final Method method, final Object... params)
throws IOException {
throws ConnectException, StandbyException, IOException {
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() +
" with params " + Arrays.toString(params) + " from "
" with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
@ -388,6 +421,12 @@ public class RouterRpcClient {
this.rpcMonitor.proxyOpFailureStandby();
}
failover = true;
} else if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
}
failover = true;
} else if (ioe instanceof RemoteException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
@ -408,7 +447,7 @@ public class RouterRpcClient {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes();
}
LOG.error("Can not get available namenode for {} {} error: {}",
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
@ -433,25 +472,34 @@ public class RouterRpcClient {
// All namenodes were unavailable or in standby
String msg = "No namenode available to invoke " + method.getName() + " " +
Arrays.toString(params);
Arrays.deepToString(params) + " in " + namenodes + " from " +
router.getRouterId();
LOG.error(msg);
int exConnect = 0;
for (Entry<FederationNamenodeContext, IOException> entry :
ioes.entrySet()) {
FederationNamenodeContext namenode = entry.getKey();
String nsId = namenode.getNameserviceId();
String nnId = namenode.getNamenodeId();
String nnKey = namenode.getNamenodeKey();
String addr = namenode.getRpcAddress();
IOException ioe = entry.getValue();
if (ioe instanceof StandbyException) {
LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr,
ioe.getMessage());
LOG.error("{} at {} is in Standby: {}",
nnKey, addr, ioe.getMessage());
} else if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException) {
exConnect++;
LOG.error("{} at {} cannot be reached: {}",
nnKey, addr, ioe.getMessage());
} else {
LOG.error("{} {} at {} error: \"{}\"",
nsId, nnId, addr, ioe.getMessage());
LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage());
}
}
if (exConnect == ioes.size()) {
throw new ConnectException(msg);
} else {
throw new StandbyException(msg);
}
}
/**
* Invokes a method on the designated object. Catches exceptions specific to
@ -497,6 +545,9 @@ public class RouterRpcClient {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
throw ioe;
} else if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException) {
throw ioe;
} else {
throw new StandbyException(ioe.getMessage());
}
@ -1043,7 +1094,7 @@ public class RouterRpcClient {
if (locations.isEmpty()) {
throw new IOException("No remote locations available");
} else if (locations.size() == 1) {
} else if (locations.size() == 1 && timeOutMs <= 0) {
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();

View File

@ -30,6 +30,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@ -133,6 +134,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
@ -294,7 +296,9 @@ public class RouterRpcServer extends AbstractService
AccessControlException.class,
LeaseExpiredException.class,
NotReplicatedYetException.class,
IOException.class);
IOException.class,
ConnectException.class,
RetriableException.class);
this.rpcServer.addSuppressedLoggingExceptions(
StandbyException.class);
@ -520,7 +524,7 @@ public class RouterRpcServer extends AbstractService
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
throw new IOException("No namespace availaible.");
throw new IOException("No namespace available.");
}
nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
@ -566,6 +570,7 @@ public class RouterRpcServer extends AbstractService
replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
}
/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
@ -574,10 +579,24 @@ public class RouterRpcServer extends AbstractService
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
RemoteLocation getCreateLocation(final String src)
RemoteLocation getCreateLocation(final String src) throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
return getCreateLocation(src, locations);
}
/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
*
* @param src Path of the file to check.
* @param locations Prefetched locations for the file.
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
RemoteLocation getCreateLocation(
final String src, final List<RemoteLocation> locations)
throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
if (locations == null || locations.isEmpty()) {
throw new IOException("Cannot get locations to create " + src);
}
@ -1568,6 +1587,27 @@ public class RouterRpcServer extends AbstractService
return false;
}
/**
* Check if a path supports failed subclusters.
*
* @param path Path to check.
* @return If a path should support failed subclusters.
*/
boolean isPathFaultTolerant(final String path) {
if (subclusterResolver instanceof MountTableResolver) {
try {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
MountTable entry = mountTable.getMountPoint(path);
if (entry != null) {
return entry.isFaultTolerant();
}
} catch (IOException e) {
LOG.error("Cannot get mount point", e);
}
}
return false;
}
/**
* Check if call needs to be invoked to all the locations. The call is
* supposed to be invoked in all the locations in case the order of the mount

View File

@ -66,6 +66,7 @@ public class MountTableStoreImpl extends MountTableStore {
if (pc != null) {
pc.checkPermission(mountTable, FsAction.WRITE);
}
mountTable.validate();
}
boolean status = getDriver().put(mountTable, false, true);
@ -85,6 +86,7 @@ public class MountTableStoreImpl extends MountTableStore {
if (pc != null) {
pc.checkPermission(mountTable, FsAction.WRITE);
}
mountTable.validate();
}
boolean status = getDriver().put(mountTable, true, true);

View File

@ -26,6 +26,7 @@ import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -59,6 +60,10 @@ public abstract class MountTable extends BaseRecord {
"Invalid entry, invalid destination path ";
public static final String ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH =
"Invalid entry, all destination must start with / ";
private static final String ERROR_MSG_FAULT_TOLERANT_MULTI_DEST =
"Invalid entry, fault tolerance requires multiple destinations ";
private static final String ERROR_MSG_FAULT_TOLERANT_ALL =
"Invalid entry, fault tolerance only supported for ALL order ";
/** Comparator for paths which considers the /. */
public static final Comparator<String> PATH_COMPARATOR =
@ -228,6 +233,20 @@ public abstract class MountTable extends BaseRecord {
*/
public abstract void setDestOrder(DestinationOrder order);
/**
* Check if the mount point supports a failed destination.
*
* @return If it supports failures.
*/
public abstract boolean isFaultTolerant();
/**
* Set if the mount point supports failed destinations.
*
* @param faultTolerant If it supports failures.
*/
public abstract void setFaultTolerant(boolean faultTolerant);
/**
* Get owner name of this mount table entry.
*
@ -321,11 +340,14 @@ public abstract class MountTable extends BaseRecord {
List<RemoteLocation> destinations = this.getDestinations();
sb.append(destinations);
if (destinations != null && destinations.size() > 1) {
sb.append("[" + this.getDestOrder() + "]");
sb.append("[").append(this.getDestOrder()).append("]");
}
if (this.isReadOnly()) {
sb.append("[RO]");
}
if (this.isFaultTolerant()) {
sb.append("[FT]");
}
if (this.getOwnerName() != null) {
sb.append("[owner:").append(this.getOwnerName()).append("]");
@ -383,6 +405,16 @@ public abstract class MountTable extends BaseRecord {
ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH + this);
}
}
if (isFaultTolerant()) {
if (getDestinations().size() < 2) {
throw new IllegalArgumentException(
ERROR_MSG_FAULT_TOLERANT_MULTI_DEST + this);
}
if (!isAll()) {
throw new IllegalArgumentException(
ERROR_MSG_FAULT_TOLERANT_ALL + this);
}
}
}
@Override
@ -397,6 +429,7 @@ public abstract class MountTable extends BaseRecord {
.append(this.getDestinations())
.append(this.isReadOnly())
.append(this.getDestOrder())
.append(this.isFaultTolerant())
.toHashCode();
}
@ -404,16 +437,13 @@ public abstract class MountTable extends BaseRecord {
public boolean equals(Object obj) {
if (obj instanceof MountTable) {
MountTable other = (MountTable)obj;
if (!this.getSourcePath().equals(other.getSourcePath())) {
return false;
} else if (!this.getDestinations().equals(other.getDestinations())) {
return false;
} else if (this.isReadOnly() != other.isReadOnly()) {
return false;
} else if (!this.getDestOrder().equals(other.getDestOrder())) {
return false;
}
return true;
return new EqualsBuilder()
.append(this.getSourcePath(), other.getSourcePath())
.append(this.getDestinations(), other.getDestinations())
.append(this.isReadOnly(), other.isReadOnly())
.append(this.getDestOrder(), other.getDestOrder())
.append(this.isFaultTolerant(), other.isFaultTolerant())
.isEquals();
}
return false;
}
@ -424,9 +454,7 @@ public abstract class MountTable extends BaseRecord {
*/
public boolean isAll() {
DestinationOrder order = getDestOrder();
return order == DestinationOrder.HASH_ALL ||
order == DestinationOrder.RANDOM ||
order == DestinationOrder.SPACE;
return DestinationOrder.FOLDER_ALL.contains(order);
}
/**

View File

@ -195,6 +195,20 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
}
}
@Override
public boolean isFaultTolerant() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasFaultTolerant()) {
return false;
}
return proto.getFaultTolerant();
}
@Override
public void setFaultTolerant(boolean faultTolerant) {
this.translator.getBuilder().setFaultTolerant(faultTolerant);
}
@Override
public String getOwnerName() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();

View File

@ -135,12 +135,12 @@ public class RouterAdmin extends Configured implements Tool {
}
if (cmd.equals("-add")) {
return "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]";
} else if (cmd.equals("-update")) {
return "\t[-update <source> <nameservice1, nameservice2, ...> "
+ "<destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]";
} else if (cmd.equals("-rm")) {
return "\t[-rm <source>]";
@ -415,6 +415,7 @@ public class RouterAdmin extends Configured implements Tool {
// Optional parameters
boolean readOnly = false;
boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
@ -422,6 +423,8 @@ public class RouterAdmin extends Configured implements Tool {
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
} else if (parameters[i].equals("-faulttolerant")) {
faultTolerant = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
@ -447,7 +450,7 @@ public class RouterAdmin extends Configured implements Tool {
i++;
}
return addMount(mount, nss, dest, readOnly, order,
return addMount(mount, nss, dest, readOnly, faultTolerant, order,
new ACLEntity(owner, group, mode));
}
@ -464,7 +467,8 @@ public class RouterAdmin extends Configured implements Tool {
* @throws IOException Error adding the mount point.
*/
public boolean addMount(String mount, String[] nss, String dest,
boolean readonly, DestinationOrder order, ACLEntity aclInfo)
boolean readonly, boolean faultTolerant, DestinationOrder order,
ACLEntity aclInfo)
throws IOException {
mount = normalizeFileSystemPath(mount);
// Get the existing entry
@ -491,6 +495,9 @@ public class RouterAdmin extends Configured implements Tool {
if (readonly) {
newEntry.setReadOnly(true);
}
if (faultTolerant) {
newEntry.setFaultTolerant(true);
}
if (order != null) {
newEntry.setDestOrder(order);
}
@ -508,6 +515,8 @@ public class RouterAdmin extends Configured implements Tool {
newEntry.setMode(aclInfo.getMode());
}
newEntry.validate();
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
@ -527,6 +536,9 @@ public class RouterAdmin extends Configured implements Tool {
if (readonly) {
existingEntry.setReadOnly(true);
}
if (faultTolerant) {
existingEntry.setFaultTolerant(true);
}
if (order != null) {
existingEntry.setDestOrder(order);
}
@ -544,6 +556,8 @@ public class RouterAdmin extends Configured implements Tool {
existingEntry.setMode(aclInfo.getMode());
}
existingEntry.validate();
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryResponse updateResponse =
@ -572,6 +586,7 @@ public class RouterAdmin extends Configured implements Tool {
// Optional parameters
boolean readOnly = false;
boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
@ -579,6 +594,8 @@ public class RouterAdmin extends Configured implements Tool {
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
} else if (parameters[i].equals("-faulttolerant")) {
faultTolerant = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
@ -604,7 +621,7 @@ public class RouterAdmin extends Configured implements Tool {
i++;
}
return updateMount(mount, nss, dest, readOnly, order,
return updateMount(mount, nss, dest, readOnly, faultTolerant, order,
new ACLEntity(owner, group, mode));
}
@ -621,7 +638,8 @@ public class RouterAdmin extends Configured implements Tool {
* @throws IOException Error updating the mount point.
*/
public boolean updateMount(String mount, String[] nss, String dest,
boolean readonly, DestinationOrder order, ACLEntity aclInfo)
boolean readonly, boolean faultTolerant,
DestinationOrder order, ACLEntity aclInfo)
throws IOException {
mount = normalizeFileSystemPath(mount);
MountTableManager mountTable = client.getMountTableManager();
@ -634,6 +652,7 @@ public class RouterAdmin extends Configured implements Tool {
MountTable newEntry = MountTable.newInstance(mount, destMap);
newEntry.setReadOnly(readonly);
newEntry.setFaultTolerant(faultTolerant);
if (order != null) {
newEntry.setDestOrder(order);

View File

@ -143,6 +143,8 @@ message MountTableRecordProto {
optional int32 mode = 12;
optional QuotaUsageProto quota = 13;
optional bool faultTolerant = 14 [default = false];
}
message AddMountTableEntryRequestProto {

View File

@ -503,6 +503,36 @@
</description>
</property>
<property>
<name>dfs.federation.router.client.mount-status.time-out</name>
<value>1s</value>
<description>
Set a timeout for the Router when listing folders containing mount
points. In this process, the Router checks the mount table and then it
checks permissions in the subcluster. After the time out, we return the
default values.
</description>
</property>
<property>
<name>dfs.federation.router.connect.max.retries.on.timeouts</name>
<value>0</value>
<description>
Maximum number of retries for the IPC Client when connecting to the
subclusters. By default, it doesn't let the IPC retry and the Router
handles it.
</description>
</property>
<property>
<name>dfs.federation.router.connect.timeout</name>
<value>2s</value>
<description>
Time out for the IPC client connecting to the subclusters. This should be
short as the Router has knowledge of the state of the Routers.
</description>
</property>
<property>
<name>dfs.federation.router.keytab.file</name>
<value></value>

View File

@ -393,6 +393,7 @@
<th>Target path</th>
<th>Order</th>
<th>Read only</th>
<th>Fault tolerant</th>
<th>Owner</th>
<th>Group</th>
<th>Permission</th>
@ -409,6 +410,7 @@
<td>{path}</td>
<td>{order}</td>
<td align="center" class="mount-table-icon mount-table-read-only-{readonly}" title="{status}"/>
<td align="center" class="mount-table-icon mount-table-fault-tolerant-{faulttolerant}" title="{ftStatus}"></td>
<td>{ownerName}</td>
<td>{groupName}</td>
<td>{mode}</td>

View File

@ -324,8 +324,20 @@
}
}
function augment_fault_tolerant(mountTable) {
for (var i = 0, e = mountTable.length; i < e; ++i) {
if (mountTable[i].faulttolerant == true) {
mountTable[i].faulttolerant = "true"
mountTable[i].ftStatus = "Fault tolerant"
} else {
mountTable[i].faulttolerant = "false"
}
}
}
resource.MountTable = JSON.parse(resource.MountTable)
augment_read_only(resource.MountTable)
augment_fault_tolerant(resource.MountTable)
return resource;
}

View File

@ -135,3 +135,8 @@
color: #5fa341;
content: "\e033";
}
.mount-table-fault-tolerant-true:before {
color: #5fa341;
content: "\e033";
}

View File

@ -266,6 +266,14 @@ To determine which subcluster contains a file:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt
Note that consistency of the data across subclusters is not guaranteed by the Router.
By default, if one subcluster is unavailable, writes may fail if they target that subcluster.
To allow writing in another subcluster, one can make the mount point fault tolerant:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data ns1,ns2 /data -order HASH_ALL -faulttolerant
Note that this can lead to a file to be written in multiple subclusters or a folder missing in one.
One needs to be aware of the possibility of these inconsistencies and target this `faulttolerant` approach to resilient paths.
An example for this is the `/app-logs` folder which will mostly write once into a subfolder.
### Disabling nameservices

View File

@ -18,19 +18,44 @@
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
@ -40,15 +65,29 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.BlockingService;
@ -59,9 +98,15 @@ import com.google.protobuf.BlockingService;
*/
public class MockNamenode {
private static final Logger LOG =
LoggerFactory.getLogger(MockNamenode.class);
/** Mock implementation of the Namenode. */
private final NamenodeProtocols mockNn;
/** Name service identifier (subcluster). */
private String nsId;
/** HA state of the Namenode. */
private HAServiceState haState = HAServiceState.STANDBY;
@ -71,9 +116,13 @@ public class MockNamenode {
private HttpServer2 httpServer;
public MockNamenode() throws Exception {
Configuration conf = new Configuration();
public MockNamenode(final String nsIdentifier) throws IOException {
this(nsIdentifier, new HdfsConfiguration());
}
public MockNamenode(final String nsIdentifier, final Configuration conf)
throws IOException {
this.nsId = nsIdentifier;
this.mockNn = mock(NamenodeProtocols.class);
setupMock();
setupRPCServer(conf);
@ -86,7 +135,7 @@ public class MockNamenode {
* @throws IOException If the mock cannot be setup.
*/
protected void setupMock() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
when(mockNn.versionRequest()).thenReturn(nsInfo);
when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
@ -115,11 +164,16 @@ public class MockNamenode {
ClientNamenodeProtocol.newReflectiveBlockingService(
clientNNProtoXlator);
int numHandlers = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
rpcServer = new RPC.Builder(conf)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress("0.0.0.0")
.setPort(0)
.setNumHandlers(numHandlers)
.build();
NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
@ -146,6 +200,18 @@ public class MockNamenode {
DFSUtil.addPBProtocol(
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
this.rpcServer.addTerseExceptions(
RemoteException.class,
SafeModeException.class,
FileNotFoundException.class,
FileAlreadyExistsException.class,
AccessControlException.class,
LeaseExpiredException.class,
NotReplicatedYetException.class,
IOException.class,
ConnectException.class,
StandbyException.class);
rpcServer.start();
}
@ -188,6 +254,14 @@ public class MockNamenode {
return mockNn;
}
/**
* Get the name service id (subcluster) of the Mock Namenode.
* @return Name service identifier.
*/
public String getNameserviceId() {
return nsId;
}
/**
* Get the HA state of the Mock Namenode.
* @return HA state (ACTIVE or STANDBY).
@ -217,9 +291,158 @@ public class MockNamenode {
public void stop() throws Exception {
if (rpcServer != null) {
rpcServer.stop();
rpcServer = null;
}
if (httpServer != null) {
httpServer.stop();
httpServer = null;
}
}
/**
* Add the mock for the FileSystem calls in ClientProtocol.
* @throws IOException If it cannot be setup.
*/
public void addFileSystemMock() throws IOException {
final SortedMap<String, String> fs =
new ConcurrentSkipListMap<String, String>();
DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean());
when(l).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} getListing({})", nsId, src);
if (!src.endsWith("/")) {
src += "/";
}
Map<String, String> files =
fs.subMap(src, src + Character.MAX_VALUE);
List<HdfsFileStatus> list = new ArrayList<>();
for (String file : files.keySet()) {
if (file.substring(src.length()).indexOf('/') < 0) {
HdfsFileStatus fileStatus =
getMockHdfsFileStatus(file, fs.get(file));
list.add(fileStatus);
}
}
HdfsFileStatus[] array = list.toArray(
new HdfsFileStatus[list.size()]);
return new DirectoryListing(array, 0);
});
when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} getFileInfo({})", nsId, src);
return getMockHdfsFileStatus(src, fs.get(src));
});
HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(),
anyBoolean(), anyShort(), anyLong(), any(), any(), any());
when(c).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} create({})", nsId, src);
fs.put(src, "FILE");
return getMockHdfsFileStatus(src, "FILE");
});
LocatedBlocks b = mockNn.getBlockLocations(
anyString(), anyLong(), anyLong());
when(b).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} getBlockLocations({})", nsId, src);
if (!fs.containsKey(src)) {
LOG.error("{} cannot find {} for getBlockLocations", nsId, src);
throw new FileNotFoundException("File does not exist " + src);
}
return mock(LocatedBlocks.class);
});
boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong());
when(f).thenAnswer(invocation -> {
String src = getSrc(invocation);
if (!fs.containsKey(src)) {
LOG.error("{} cannot find {} for complete", nsId, src);
throw new FileNotFoundException("File does not exist " + src);
}
return true;
});
LocatedBlock a = mockNn.addBlock(
anyString(), anyString(), any(), any(), anyLong(), any(), any());
when(a).thenAnswer(invocation -> {
String src = getSrc(invocation);
if (!fs.containsKey(src)) {
LOG.error("{} cannot find {} for addBlock", nsId, src);
throw new FileNotFoundException("File does not exist " + src);
}
return getMockLocatedBlock(nsId);
});
boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean());
when(m).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} mkdirs({})", nsId, src);
fs.put(src, "DIRECTORY");
return true;
});
when(mockNn.getServerDefaults()).thenAnswer(invocation -> {
LOG.info("{} getServerDefaults", nsId);
FsServerDefaults defaults = mock(FsServerDefaults.class);
when(defaults.getChecksumType()).thenReturn(
Type.valueOf(DataChecksum.CHECKSUM_CRC32));
when(defaults.getKeyProviderUri()).thenReturn(nsId);
return defaults;
});
}
private static String getSrc(InvocationOnMock invocation) {
return (String) invocation.getArguments()[0];
}
/**
* Get a mock HDFS file status.
* @param filename Name of the file.
* @param type Type of the file (FILE, DIRECTORY, or null).
* @return HDFS file status
*/
private static HdfsFileStatus getMockHdfsFileStatus(
final String filename, final String type) {
if (type == null) {
return null;
}
HdfsFileStatus fileStatus = mock(HdfsFileStatus.class);
when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes());
when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class));
when(fileStatus.getOwner()).thenReturn("owner");
when(fileStatus.getGroup()).thenReturn("group");
if (type.equals("FILE")) {
when(fileStatus.getLen()).thenReturn(100L);
when(fileStatus.getReplication()).thenReturn((short) 1);
when(fileStatus.getBlockSize()).thenReturn(
HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
} else if (type.equals("DIRECTORY")) {
when(fileStatus.isDir()).thenReturn(true);
when(fileStatus.isDirectory()).thenReturn(true);
}
return fileStatus;
}
/**
* Get a mock located block pointing to one of the subclusters. It is
* allocated in a fake Datanode.
* @param nsId Name service identifier (subcluster).
* @return Mock located block.
*/
private static LocatedBlock getMockLocatedBlock(final String nsId) {
LocatedBlock lb = mock(LocatedBlock.class);
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
1111, 1112, 1113, 1114);
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
ExtendedBlock eb = mock(ExtendedBlock.class);
when(eb.getBlockPoolId()).thenReturn(nsId);
when(lb.getBlock()).thenReturn(eb);
@SuppressWarnings("unchecked")
Token<BlockTokenIdentifier> tok = mock(Token.class);
when(tok.getIdentifier()).thenReturn(nsId.getBytes());
when(tok.getPassword()).thenReturn(nsId.getBytes());
when(tok.getKind()).thenReturn(new Text(nsId));
when(tok.getService()).thenReturn(new Text(nsId));
when(lb.getBlockToken()).thenReturn(tok);
return lb;
}
}

View File

@ -152,7 +152,7 @@ public class TestRouterAdminCLI {
@Test
public void testAddMountTable() throws Exception {
String nsId = "ns0";
String nsId = "ns0,ns1";
String src = "/test-addmounttable";
String dest = "/addmounttable";
String[] argv = new String[] {"-add", src, nsId, dest};
@ -166,26 +166,35 @@ public class TestRouterAdminCLI {
MountTable mountTable = getResponse.getEntries().get(0);
List<RemoteLocation> destinations = mountTable.getDestinations();
assertEquals(1, destinations.size());
assertEquals(2, destinations.size());
assertEquals(src, mountTable.getSourcePath());
assertEquals(nsId, destinations.get(0).getNameserviceId());
assertEquals("ns0", destinations.get(0).getNameserviceId());
assertEquals(dest, destinations.get(0).getDest());
assertEquals("ns1", destinations.get(1).getNameserviceId());
assertEquals(dest, destinations.get(1).getDest());
assertFalse(mountTable.isReadOnly());
assertFalse(mountTable.isFaultTolerant());
// test mount table update behavior
dest = dest + "-new";
argv = new String[] {"-add", src, nsId, dest, "-readonly"};
argv = new String[] {"-add", src, nsId, dest, "-readonly",
"-faulttolerant", "-order", "HASH_ALL"};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
mountTable = getResponse.getEntries().get(0);
assertEquals(2, mountTable.getDestinations().size());
assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId());
assertEquals(dest, mountTable.getDestinations().get(1).getDest());
assertEquals(4, mountTable.getDestinations().size());
RemoteLocation loc2 = mountTable.getDestinations().get(2);
assertEquals("ns0", loc2.getNameserviceId());
assertEquals(dest, loc2.getDest());
RemoteLocation loc3 = mountTable.getDestinations().get(3);
assertEquals("ns1", loc3.getNameserviceId());
assertEquals(dest, loc3.getDest());
assertTrue(mountTable.isReadOnly());
assertTrue(mountTable.isFaultTolerant());
}
@Test
@ -211,6 +220,7 @@ public class TestRouterAdminCLI {
assertEquals(nsId, destinations.get(0).getNameserviceId());
assertEquals(dest, destinations.get(0).getDest());
assertFalse(mountTable.isReadOnly());
assertFalse(mountTable.isFaultTolerant());
// test mount table update behavior
dest = dest + "-new";
@ -516,17 +526,19 @@ public class TestRouterAdminCLI {
System.setOut(new PrintStream(out));
String[] argv = new String[] {"-add", src, nsId};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains(
assertTrue("Wrong message: " + out, out.toString().contains(
"\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "[-readonly] [-faulttolerant] "
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]"));
out.reset();
argv = new String[] {"-update", src, nsId};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains(
assertTrue("Wrong message: " + out, out.toString().contains(
"\t[-update <source> <nameservice1, nameservice2, ...> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "[-readonly] [-faulttolerant] "
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]"));
out.reset();
@ -567,10 +579,11 @@ public class TestRouterAdminCLI {
assertEquals(-1, ToolRunner.run(admin, argv));
String expected = "Usage: hdfs dfsrouteradmin :\n"
+ "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-update <source> <nameservice1, nameservice2, ...> "
+ "<destination> " + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "<destination> "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n"
+ "\t[-getDestination <path>]\n"
@ -579,7 +592,7 @@ public class TestRouterAdminCLI {
+ "\t[-safemode enter | leave | get]\n"
+ "\t[-nameservice enable | disable <nameservice>]\n"
+ "\t[-getDisabledNameservices]";
assertTrue(out.toString(), out.toString().contains(expected));
assertTrue("Wrong message: " + out, out.toString().contains(expected));
out.reset();
}
@ -1159,4 +1172,28 @@ public class TestRouterAdminCLI {
argv = new String[] {"-getDestination /file1.txt /file2.txt"};
assertEquals(-1, ToolRunner.run(admin, argv));
}
@Test
public void testErrorFaultTolerant() throws Exception {
System.setErr(new PrintStream(err));
String[] argv = new String[] {"-add", "/mntft", "ns01", "/tmp",
"-faulttolerant"};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(err.toString(), err.toString().contains(
"Invalid entry, fault tolerance requires multiple destinations"));
err.reset();
System.setErr(new PrintStream(err));
argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
"-order", "HASH", "-faulttolerant"};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(err.toString(), err.toString().contains(
"Invalid entry, fault tolerance only supported for ALL order"));
err.reset();
argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
"-order", "HASH_ALL", "-faulttolerant"};
assertEquals(0, ToolRunner.run(admin, argv));
}
}

View File

@ -0,0 +1,654 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the handling of fault tolerant mount points in the Router.
*/
public class TestRouterFaultTolerant {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterFaultTolerant.class);
/** Number of files to create for testing. */
private static final int NUM_FILES = 10;
/** Number of Routers for test. */
private static final int NUM_ROUTERS = 2;
/** Namenodes for the test per name service id (subcluster). */
private Map<String, MockNamenode> namenodes = new HashMap<>();
/** Routers for the test. */
private List<Router> routers = new ArrayList<>();
/** Run test tasks in parallel. */
private ExecutorService service;
@Before
public void setup() throws Exception {
LOG.info("Start the Namenodes");
Configuration nnConf = new HdfsConfiguration();
nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
for (final String nsId : asList("ns0", "ns1")) {
MockNamenode nn = new MockNamenode(nsId, nnConf);
nn.transitionToActive();
nn.addFileSystemMock();
namenodes.put(nsId, nn);
}
LOG.info("Start the Routers");
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
// Speedup time outs
routerConf.setTimeDuration(
RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
500, TimeUnit.MILLISECONDS);
Configuration stateStoreConf = getStateStoreConfiguration();
stateStoreConf.setClass(
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
stateStoreConf.setClass(
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MultipleDestinationMountTableResolver.class,
FileSubclusterResolver.class);
routerConf.addResource(stateStoreConf);
for (int i = 0; i < NUM_ROUTERS; i++) {
// router0 doesn't allow partial listing
routerConf.setBoolean(
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, i != 0);
final Router router = new Router();
router.init(routerConf);
router.start();
routers.add(router);
}
LOG.info("Registering the subclusters in the Routers");
registerSubclusters(Collections.singleton("ns1"));
LOG.info("Stop ns1 to simulate an unavailable subcluster");
namenodes.get("ns1").stop();
service = Executors.newFixedThreadPool(10);
}
/**
* Register the subclusters in all Routers.
* @param unavailableSubclusters Set of unavailable subclusters.
* @throws IOException If it cannot register a subcluster.
*/
private void registerSubclusters(Set<String> unavailableSubclusters)
throws IOException {
for (final Router router : routers) {
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
for (final MockNamenode nn : namenodes.values()) {
String nsId = nn.getNameserviceId();
String rpcAddress = "localhost:" + nn.getRPCPort();
String httpAddress = "localhost:" + nn.getHTTPPort();
NamenodeStatusReport report = new NamenodeStatusReport(
nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
if (unavailableSubclusters.contains(nsId)) {
LOG.info("Register {} as UNAVAILABLE", nsId);
report.setRegistrationValid(false);
} else {
LOG.info("Register {} as ACTIVE", nsId);
report.setRegistrationValid(true);
}
report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
resolver.registerNamenode(report);
}
resolver.loadCache(true);
}
}
@After
public void cleanup() throws Exception {
LOG.info("Stopping the cluster");
for (final MockNamenode nn : namenodes.values()) {
nn.stop();
}
namenodes.clear();
routers.forEach(router -> router.stop());
routers.clear();
if (service != null) {
service.shutdown();
service = null;
}
}
/**
* Add a mount table entry in some name services and wait until it is
* available.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
private void createMountTableEntry(
final String mountPoint, final DestinationOrder order,
Collection<String> nsIds) throws Exception {
Router router = getRandomRouter();
RouterClient admin = getAdminClient(router);
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : nsIds) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
refreshRoutersCaches();
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals("Too many entries: " + entries, 1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
/**
* Update a mount table entry to be fault tolerant.
* @param mountPoint Mount point to update.
* @throws IOException If it cannot update the mount point.
*/
private void updateMountPointFaultTolerant(final String mountPoint)
throws IOException {
Router router = getRandomRouter();
RouterClient admin = getAdminClient(router);
MountTableManager mountTable = admin.getMountTableManager();
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse entries =
mountTable.getMountTableEntries(getRequest);
MountTable updateEntry = entries.getEntries().get(0);
updateEntry.setFaultTolerant(true);
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(updateEntry);
UpdateMountTableEntryResponse updateResponse =
mountTable.updateMountTableEntry(updateRequest);
assertTrue(updateResponse.getStatus());
refreshRoutersCaches();
}
/**
* Refresh the caches of all Routers (to get the mount table).
*/
private void refreshRoutersCaches() {
for (final Router router : routers) {
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
}
}
/**
* Test the behavior of the Router when one of the subclusters in a mount
* point fails. In particular, it checks if it can write files or not.
* Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}.
*/
@Test
public void testWriteWithFailedSubcluster() throws Exception {
// Run the actual tests with each approach
final List<Callable<Boolean>> tasks = new ArrayList<>();
final List<DestinationOrder> orders = asList(
DestinationOrder.HASH_ALL,
DestinationOrder.SPACE,
DestinationOrder.RANDOM,
DestinationOrder.HASH);
for (DestinationOrder order : orders) {
tasks.add(() -> {
testWriteWithFailedSubcluster(order);
return true;
});
}
TaskResults results = collectResults("Full tests", tasks);
assertEquals(orders.size(), results.getSuccess());
}
/**
* Test the behavior of the Router when one of the subclusters in a mount
* point fails. It assumes that ns1 is already down.
* @param order Destination order of the mount point.
* @throws Exception If we cannot run the test.
*/
private void testWriteWithFailedSubcluster(final DestinationOrder order)
throws Exception {
final FileSystem router0Fs = getFileSystem(routers.get(0));
final FileSystem router1Fs = getFileSystem(routers.get(1));
final FileSystem ns0Fs = getFileSystem(namenodes.get("ns0").getRPCPort());
final String mountPoint = "/" + order + "-failsubcluster";
final Path mountPath = new Path(mountPoint);
LOG.info("Setup {} with order {}", mountPoint, order);
createMountTableEntry(mountPoint, order, namenodes.keySet());
LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
mountPath);
checkDirectoriesFaultTolerant(
mountPath, order, router0Fs, router1Fs, ns0Fs, false);
checkFilesFaultTolerant(
mountPath, order, router0Fs, router1Fs, ns0Fs, false);
LOG.info("Make {} fault tolerant and everything succeeds", mountPath);
IOException ioe = null;
try {
updateMountPointFaultTolerant(mountPoint);
} catch (IOException e) {
ioe = e;
}
if (DestinationOrder.FOLDER_ALL.contains(order)) {
assertNull(ioe);
checkDirectoriesFaultTolerant(
mountPath, order, router0Fs, router1Fs, ns0Fs, true);
checkFilesFaultTolerant(
mountPath, order, router0Fs, router1Fs, ns0Fs, true);
} else {
assertTrue(ioe.getMessage().startsWith(
"Invalid entry, fault tolerance only supported for ALL order"));
}
}
/**
* Check directory creation on a mount point.
* If it is fault tolerant, it should be able to write everything.
* If it is not fault tolerant, it should fail to write some.
*/
private void checkDirectoriesFaultTolerant(
Path mountPoint, DestinationOrder order,
FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
boolean faultTolerant) throws Exception {
final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
LOG.info("Create directories in {}", mountPoint);
final List<Callable<Boolean>> tasks = new ArrayList<>();
for (int i = 0; i < NUM_FILES; i++) {
final Path dir = new Path(mountPoint,
String.format("dir-%s-%03d", faultTolerant, i));
FileSystem fs = getRandomRouterFileSystem();
tasks.add(getDirCreateTask(fs, dir));
}
TaskResults results = collectResults("Create dir " + mountPoint, tasks);
LOG.info("Check directories results for {}: {}", mountPoint, results);
if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) {
assertEquals(NUM_FILES, results.getSuccess());
assertEquals(0, results.getFailure());
} else {
assertBothResults("check dir " + mountPoint, NUM_FILES, results);
}
LOG.info("Check directories listing for {}", mountPoint);
tasks.add(getListFailTask(router0Fs, mountPoint));
int filesExpected = dirs0.length + results.getSuccess();
tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
assertEquals(2, collectResults("List " + mountPoint, tasks).getSuccess());
}
/**
* Check file creation on a mount point.
* If it is fault tolerant, it should be able to write everything.
* If it is not fault tolerant, it should fail to write some of the files.
*/
private void checkFilesFaultTolerant(
Path mountPoint, DestinationOrder order,
FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
boolean faultTolerant) throws Exception {
// Get one of the existing sub directories
final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
final Path dir0 = Path.getPathWithoutSchemeAndAuthority(
dirs0[0].getPath());
LOG.info("Create files in {}", dir0);
final List<Callable<Boolean>> tasks = new ArrayList<>();
for (int i = 0; i < NUM_FILES; i++) {
final String newFile = String.format("%s/file-%03d.txt", dir0, i);
FileSystem fs = getRandomRouterFileSystem();
tasks.add(getFileCreateTask(fs, newFile, ns0Fs));
}
TaskResults results = collectResults("Create file " + dir0, tasks);
LOG.info("Check files results for {}: {}", dir0, results);
if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) {
assertEquals(NUM_FILES, results.getSuccess());
assertEquals(0, results.getFailure());
} else {
assertBothResults("check files " + dir0, NUM_FILES, results);
}
LOG.info("Check files listing for {}", dir0);
tasks.add(getListFailTask(router0Fs, dir0));
tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
assertEquals(2, collectResults("List " + dir0, tasks).getSuccess());
}
/**
* Get the string representation for the files.
* @param files Files to check.
* @return String representation.
*/
private static String toString(final FileStatus[] files) {
final StringBuilder sb = new StringBuilder();
sb.append("[");
for (final FileStatus file : files) {
if (sb.length() > 1) {
sb.append(", ");
}
sb.append(Path.getPathWithoutSchemeAndAuthority(file.getPath()));
}
sb.append("]");
return sb.toString();
}
/**
* List the files in a path.
* @param fs File system to check.
* @param path Path to list.
* @return List of files.
* @throws IOException If we cannot list.
*/
private FileStatus[] listStatus(final FileSystem fs, final Path path)
throws IOException {
FileStatus[] files = new FileStatus[] {};
try {
files = fs.listStatus(path);
} catch (FileNotFoundException fnfe) {
LOG.debug("File not found: {}", fnfe.getMessage());
}
return files;
}
/**
* Task that creates a file and checks if it is available.
* @param file File to create.
* @param checkFs File system for checking if the file is properly created.
* @return Result of creating the file.
*/
private static Callable<Boolean> getFileCreateTask(
final FileSystem fs, final String file, FileSystem checkFs) {
return () -> {
try {
Path path = new Path(file);
FSDataOutputStream os = fs.create(path);
// We don't write because we have no mock Datanodes
os.close();
FileStatus fileStatus = checkFs.getFileStatus(path);
assertTrue("File not created properly: " + fileStatus,
fileStatus.getLen() > 0);
return true;
} catch (RemoteException re) {
return false;
}
};
}
/**
* Task that creates a directory.
* @param dir Directory to create.
* @return Result of creating the directory..
*/
private static Callable<Boolean> getDirCreateTask(
final FileSystem fs, final Path dir) {
return () -> {
try {
fs.mkdirs(dir);
return true;
} catch (RemoteException re) {
return false;
}
};
}
/**
* Task that lists a directory and expects to fail.
* @param fs File system to check.
* @param path Path to try to list.
* @return If the listing failed as expected.
*/
private static Callable<Boolean> getListFailTask(FileSystem fs, Path path) {
return () -> {
try {
fs.listStatus(path);
return false;
} catch (RemoteException re) {
return true;
}
};
}
/**
* Task that lists a directory and succeeds.
* @param fs File system to check.
* @param path Path to list.
* @param expected Number of files to expect to find.
* @return If the listing succeeds.
*/
private static Callable<Boolean> getListSuccessTask(
FileSystem fs, Path path, int expected) {
return () -> {
final FileStatus[] dirs = fs.listStatus(path);
assertEquals(toString(dirs), expected, dirs.length);
return true;
};
}
/**
* Invoke a set of tasks and collect their outputs.
* The tasks should do assertions.
*
* @param service Execution Service to run the tasks.
* @param tasks Tasks to run.
* @throws Exception If it cannot collect the results.
*/
private TaskResults collectResults(final String tag,
final Collection<Callable<Boolean>> tasks) throws Exception {
final TaskResults results = new TaskResults();
service.invokeAll(tasks).forEach(task -> {
try {
boolean succeeded = task.get();
if (succeeded) {
LOG.info("Got success for {}", tag);
results.incrSuccess();
} else {
LOG.info("Got failure for {}", tag);
results.incrFailure();
}
} catch (Exception e) {
fail(e.getMessage());
}
});
tasks.clear();
return results;
}
/**
* Class to summarize the results of running a task.
*/
static class TaskResults {
private final AtomicInteger success = new AtomicInteger(0);
private final AtomicInteger failure = new AtomicInteger(0);
public void incrSuccess() {
success.incrementAndGet();
}
public void incrFailure() {
failure.incrementAndGet();
}
public int getSuccess() {
return success.get();
}
public int getFailure() {
return failure.get();
}
public int getTotal() {
return success.get() + failure.get();
}
@Override
public String toString() {
return new StringBuilder()
.append("Success=").append(getSuccess())
.append(" Failure=").append(getFailure())
.toString();
}
}
/**
* Asserts that the results are the expected amount and it has both success
* and failure.
* @param msg Message to show when the assertion fails.
* @param expected Expected number of results.
* @param actual Actual results.
*/
private static void assertBothResults(String msg,
int expected, TaskResults actual) {
assertEquals(msg, expected, actual.getTotal());
assertTrue("Expected some success for " + msg, actual.getSuccess() > 0);
assertTrue("Expected some failure for " + msg, actual.getFailure() > 0);
}
/**
* Get a random Router from the cluster.
* @return Random Router.
*/
private Router getRandomRouter() {
Random rnd = new Random();
int index = rnd.nextInt(routers.size());
return routers.get(index);
}
/**
* Get a file system from one of the Routers as a random user to allow better
* concurrency in the Router.
* @return File system from a random user.
* @throws Exception If we cannot create the file system.
*/
private FileSystem getRandomRouterFileSystem() throws Exception {
final UserGroupInformation userUgi =
UserGroupInformation.createUserForTesting(
"user-" + UUID.randomUUID(), new String[]{"group"});
Router router = getRandomRouter();
return userUgi.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
}
private static FileSystem getFileSystem(int rpcPort) throws IOException {
Configuration conf = new HdfsConfiguration();
URI uri = URI.create("hdfs://localhost:" + rpcPort);
return DistributedFileSystem.get(uri, conf);
}
private static FileSystem getFileSystem(final Router router)
throws IOException {
InetSocketAddress rpcAddress = router.getRpcServerAddress();
int rpcPort = rpcAddress.getPort();
return getFileSystem(rpcPort);
}
private static RouterClient getAdminClient(
final Router router) throws IOException {
Configuration conf = new HdfsConfiguration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
return new RouterClient(routerSocket, conf);
}
}

View File

@ -74,7 +74,7 @@ public class TestRouterNamenodeMonitoring {
for (String nsId : nsIds) {
nns.put(nsId, new HashMap<>());
for (String nnId : asList("nn0", "nn1")) {
nns.get(nsId).put(nnId, new MockNamenode());
nns.get(nsId).put(nnId, new MockNamenode(nsId));
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
/**
@ -93,7 +94,7 @@ public final class FederationStateStoreTestUtils {
conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
if (StateStoreFileBaseImpl.class.isAssignableFrom(clazz)) {
setFileConfiguration(conf);
}
return conf;
@ -178,8 +179,7 @@ public final class FederationStateStoreTestUtils {
* @param conf Configuration to extend.
*/
public static void setFileConfiguration(Configuration conf) {
String workingPath = System.getProperty("user.dir");
String stateStorePath = workingPath + "/statestore";
String stateStorePath = GenericTestUtils.getRandomizedTempPath();
conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
}

View File

@ -78,15 +78,17 @@ public class TestStateStoreMountTable extends TestStateStoreBase {
assertFalse(getStateStore().isDriverReady());
// Test APIs that access the store to check they throw the correct exception
MountTable entry = MountTable.newInstance(
"/mnt", Collections.singletonMap("ns0", "/tmp"));
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance();
AddMountTableEntryRequest.newInstance(entry);
verifyException(mountStore, "addMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {AddMountTableEntryRequest.class},
new Object[] {addRequest});
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance();
UpdateMountTableEntryRequest.newInstance(entry);
verifyException(mountStore, "updateMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {UpdateMountTableEntryRequest.class},

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -164,6 +165,24 @@ public class TestMountTable {
assertTrue(record2.isReadOnly());
}
@Test
public void testFaultTolerant() throws IOException {
Map<String, String> dest = new LinkedHashMap<>();
dest.put(DST_NS_0, DST_PATH_0);
dest.put(DST_NS_1, DST_PATH_1);
MountTable record0 = MountTable.newInstance(SRC, dest);
assertFalse(record0.isFaultTolerant());
MountTable record1 = MountTable.newInstance(SRC, dest);
assertFalse(record1.isFaultTolerant());
assertEquals(record0, record1);
record1.setFaultTolerant(true);
assertTrue(record1.isFaultTolerant());
assertNotEquals(record0, record1);
}
@Test
public void testOrder() throws IOException {
testOrder(DestinationOrder.HASH);

View File

@ -428,8 +428,8 @@ Runs the DFS router. See [Router](../hadoop-hdfs-rbf/HDFSRouterFederation.html#R
Usage:
hdfs dfsrouteradmin
[-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-rm <source>]
[-ls <path>]
[-getDestination <path>]