diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java index 5ab978d7f2b..a39f17d2a48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -270,6 +270,7 @@ public class FederationMetrics implements FederationMBean { innerInfo.put("order", ""); } innerInfo.put("readonly", entry.isReadOnly()); + innerInfo.put("faulttolerant", entry.isFaultTolerant()); info.add(Collections.unmodifiableMap(innerInfo)); } } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java index 99c5e22d12a..6a637d5e46f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.resolver.order; +import java.util.EnumSet; + /** * Order of the destinations when we have multiple of them. When the resolver * of files to subclusters (FileSubclusterResolver) has multiple destinations, @@ -27,5 +29,11 @@ public enum DestinationOrder { LOCAL, // Local first RANDOM, // Random order HASH_ALL, // Follow consistent hashing - SPACE // Available space based order + SPACE; // Available space based order + + /** Approaches that write folders in all subclusters. */ + public static final EnumSet FOLDER_ALL = EnumSet.of( + HASH_ALL, + RANDOM, + SPACE); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 657b6cfc123..153cd641405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -135,7 +135,17 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST = FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing"; public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true; - + public static final String DFS_ROUTER_CLIENT_MOUNT_TIME_OUT = + FEDERATION_ROUTER_PREFIX + "client.mount-status.time-out"; + public static final long DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT = + TimeUnit.SECONDS.toMillis(1); + public static final String DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT = + FEDERATION_ROUTER_PREFIX + "connect.max.retries.on.timeouts"; + public static final int DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT = 0; + public static final String DFS_ROUTER_CLIENT_CONNECT_TIMEOUT = + FEDERATION_ROUTER_PREFIX + "connect.timeout"; + public static final long DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT = + TimeUnit.SECONDS.toMillis(2); // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java index 6ff2b01b0b6..f7ba8123d5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java @@ -210,7 +210,13 @@ public class RemoteMethod { @Override public String toString() { - return this.protocol.getSimpleName() + "#" + this.methodName + " " + - Arrays.toString(this.params); + return new StringBuilder() + .append(this.protocol.getSimpleName()) + .append("#") + .append(this.methodName) + .append("(") + .append(Arrays.deepToString(this.params)) + .append(")") + .toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java index 8816ff6fb9f..8b216d919ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java @@ -68,4 +68,13 @@ public class RemoteParam { return context.getDest(); } } + + @Override + public String toString() { + return new StringBuilder() + .append("RemoteParam(") + .append(this.paramMap) + .append(")") + .toString(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index da601425ca1..6039083a735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; @@ -93,6 +94,8 @@ import com.google.common.annotations.VisibleForTesting; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -103,6 +106,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; /** * Module that implements all the RPC calls in {@link ClientProtocol} in the @@ -119,6 +123,8 @@ public class RouterClientProtocol implements ClientProtocol { /** If it requires response from all subclusters. */ private final boolean allowPartialList; + /** Time out when getting the mount statistics. */ + private long mountStatusTimeOut; /** Identifier for the super user. */ private String superUser; @@ -140,6 +146,10 @@ public class RouterClientProtocol implements ClientProtocol { this.allowPartialList = conf.getBoolean( RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT); + this.mountStatusTimeOut = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT, + RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT, + TimeUnit.SECONDS); // User and group for reporting try { @@ -234,15 +244,92 @@ public class RouterClientProtocol implements ClientProtocol { } } - RemoteLocation createLocation = rpcServer.getCreateLocation(src); RemoteMethod method = new RemoteMethod("create", new Class[] {String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class, String.class, String.class}, - createLocation.getDest(), masked, clientName, flag, createParent, + new RemoteParam(), masked, clientName, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); - return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteLocation createLocation = null; + try { + createLocation = rpcServer.getCreateLocation(src); + return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); + } catch (IOException ioe) { + final List newLocations = checkFaultTolerantRetry( + method, src, ioe, createLocation, locations); + return rpcClient.invokeSequential( + newLocations, method, HdfsFileStatus.class, null); + } + } + + /** + * Check if an exception is caused by an unavailable subcluster or not. It + * also checks the causes. + * @param ioe IOException to check. + * @return If caused by an unavailable subcluster. False if the should not be + * retried (e.g., NSQuotaExceededException). + */ + private static boolean isUnavailableSubclusterException( + final IOException ioe) { + if (ioe instanceof ConnectException || + ioe instanceof ConnectTimeoutException || + ioe instanceof NoNamenodesAvailableException) { + return true; + } + if (ioe.getCause() instanceof IOException) { + IOException cause = (IOException)ioe.getCause(); + return isUnavailableSubclusterException(cause); + } + return false; + } + + /** + * Check if a remote method can be retried in other subclusters when it + * failed in the original destination. This method returns the list of + * locations to retry in. This is used by fault tolerant mount points. + * @param method Method that failed and might be retried. + * @param src Path where the method was invoked. + * @param e Exception that was triggered. + * @param excludeLoc Location that failed and should be excluded. + * @param locations All the locations to retry. + * @return The locations where we should retry (excluding the failed ones). + * @throws IOException If this path is not fault tolerant or the exception + * should not be retried (e.g., NSQuotaExceededException). + */ + private List checkFaultTolerantRetry( + final RemoteMethod method, final String src, final IOException ioe, + final RemoteLocation excludeLoc, final List locations) + throws IOException { + + if (!isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + if (!rpcServer.isPathFaultTolerant(src)) { + LOG.debug("{} does not allow retrying a failed subcluster", src); + throw ioe; + } + + final List newLocations; + if (excludeLoc == null) { + LOG.error("Cannot invoke {} for {}: {}", method, src, ioe.getMessage()); + newLocations = locations; + } else { + LOG.error("Cannot invoke {} for {} in {}: {}", + method, src, excludeLoc, ioe.getMessage()); + newLocations = new ArrayList<>(); + for (final RemoteLocation loc : locations) { + if (!loc.equals(excludeLoc)) { + newLocations.add(loc); + } + } + } + LOG.info("{} allows retrying failed subclusters in {}", src, newLocations); + return newLocations; } @Override @@ -604,13 +691,20 @@ public class RouterClientProtocol implements ClientProtocol { } } catch (IOException ioe) { // Can't query if this file exists or not. - LOG.error("Error requesting file info for path {} while proxing mkdirs", - src, ioe); + LOG.error("Error getting file info for {} while proxying mkdirs: {}", + src, ioe.getMessage()); } } - RemoteLocation firstLocation = locations.get(0); - return (boolean) rpcClient.invokeSingle(firstLocation, method); + final RemoteLocation firstLocation = locations.get(0); + try { + return (boolean) rpcClient.invokeSingle(firstLocation, method); + } catch (IOException ioe) { + final List newLocations = checkFaultTolerantRetry( + method, src, ioe, firstLocation, locations); + return rpcClient.invokeSequential( + newLocations, method, Boolean.class, Boolean.TRUE); + } } @Override @@ -1702,10 +1796,26 @@ public class RouterClientProtocol implements ClientProtocol { */ private HdfsFileStatus getFileInfoAll(final List locations, final RemoteMethod method) throws IOException { + return getFileInfoAll(locations, method, -1); + } + + /** + * Get the file info from all the locations. + * + * @param locations Locations to check. + * @param method The file information method to run. + * @param timeOutMs Time out for the operation in milliseconds. + * @return The first file info if it's a file, the directory if it's + * everywhere. + * @throws IOException If all the locations throw an exception. + */ + private HdfsFileStatus getFileInfoAll(final List locations, + final RemoteMethod method, long timeOutMs) throws IOException { // Get the file info from everybody Map results = - rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class); + rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs, + HdfsFileStatus.class); int children = 0; // We return the first file HdfsFileStatus dirStatus = null; @@ -1762,9 +1872,10 @@ public class RouterClientProtocol implements ClientProtocol { MountTableResolver mountTable = (MountTableResolver) subclusterResolver; MountTable entry = mountTable.getMountPoint(mName); if (entry != null) { - HdfsFileStatus fInfo = getFileInfoAll(entry.getDestinations(), - new RemoteMethod("getFileInfo", new Class[] {String.class}, - new RemoteParam())); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + HdfsFileStatus fInfo = getFileInfoAll( + entry.getDestinations(), method, mountStatusTimeOut); if (fInfo != null) { permission = fInfo.getPermission(); owner = fInfo.getOwner(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3d80c4167d1..730952b9db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -62,6 +66,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,7 +131,8 @@ public class RouterRpcClient { this.namenodeResolver = resolver; - this.connectionManager = new ConnectionManager(conf); + Configuration clientConf = getClientConfiguration(conf); + this.connectionManager = new ConnectionManager(clientConf); this.connectionManager.start(); int numThreads = conf.getInt( @@ -165,6 +171,31 @@ public class RouterRpcClient { failoverSleepBaseMillis, failoverSleepMaxMillis); } + /** + * Get the configuration for the RPC client. It takes the Router + * configuration and transforms it into regular RPC Client configuration. + * @param conf Input configuration. + * @return Configuration for the RPC client. + */ + private Configuration getClientConfiguration(final Configuration conf) { + Configuration clientConf = new Configuration(conf); + int maxRetries = conf.getInt( + RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT, + RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT); + if (maxRetries >= 0) { + clientConf.setInt( + IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries); + } + long connectTimeOut = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT, + RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + if (connectTimeOut >= 0) { + clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut); + } + return clientConf; + } + /** * Get the active namenode resolver used by this client. * @return Active namenode resolver. @@ -341,17 +372,19 @@ public class RouterRpcClient { * @param method Remote ClientProtcol method to invoke. * @param params Variable list of parameters matching the method. * @return The result of invoking the method. - * @throws IOException + * @throws ConnectException If it cannot connect to any Namenode. + * @throws StandbyException If all Namenodes are in Standby. + * @throws IOException If it cannot invoke the method. */ private Object invokeMethod( final UserGroupInformation ugi, final List namenodes, final Class protocol, final Method method, final Object... params) - throws IOException { + throws ConnectException, StandbyException, IOException { if (namenodes == null || namenodes.isEmpty()) { throw new IOException("No namenodes to invoke " + method.getName() + - " with params " + Arrays.toString(params) + " from " + " with params " + Arrays.deepToString(params) + " from " + router.getRouterId()); } @@ -388,6 +421,12 @@ public class RouterRpcClient { this.rpcMonitor.proxyOpFailureStandby(); } failover = true; + } else if (ioe instanceof ConnectException || + ioe instanceof ConnectTimeoutException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(); + } + failover = true; } else if (ioe instanceof RemoteException) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpComplete(true); @@ -408,7 +447,7 @@ public class RouterRpcClient { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpNoNamenodes(); } - LOG.error("Can not get available namenode for {} {} error: {}", + LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); // Throw RetriableException so that client can retry throw new RetriableException(ioe); @@ -433,24 +472,33 @@ public class RouterRpcClient { // All namenodes were unavailable or in standby String msg = "No namenode available to invoke " + method.getName() + " " + - Arrays.toString(params); + Arrays.deepToString(params) + " in " + namenodes + " from " + + router.getRouterId(); LOG.error(msg); + int exConnect = 0; for (Entry entry : ioes.entrySet()) { FederationNamenodeContext namenode = entry.getKey(); - String nsId = namenode.getNameserviceId(); - String nnId = namenode.getNamenodeId(); + String nnKey = namenode.getNamenodeKey(); String addr = namenode.getRpcAddress(); IOException ioe = entry.getValue(); if (ioe instanceof StandbyException) { - LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr, - ioe.getMessage()); + LOG.error("{} at {} is in Standby: {}", + nnKey, addr, ioe.getMessage()); + } else if (ioe instanceof ConnectException || + ioe instanceof ConnectTimeoutException) { + exConnect++; + LOG.error("{} at {} cannot be reached: {}", + nnKey, addr, ioe.getMessage()); } else { - LOG.error("{} {} at {} error: \"{}\"", - nsId, nnId, addr, ioe.getMessage()); + LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage()); } } - throw new StandbyException(msg); + if (exConnect == ioes.size()) { + throw new ConnectException(msg); + } else { + throw new StandbyException(msg); + } } /** @@ -497,6 +545,9 @@ public class RouterRpcClient { // failover, invoker looks for standby exceptions for failover. if (ioe instanceof StandbyException) { throw ioe; + } else if (ioe instanceof ConnectException || + ioe instanceof ConnectTimeoutException) { + throw ioe; } else { throw new StandbyException(ioe.getMessage()); } @@ -1043,7 +1094,7 @@ public class RouterRpcClient { if (locations.isEmpty()) { throw new IOException("No remote locations available"); - } else if (locations.size() == 1) { + } else if (locations.size() == 1 && timeOutMs <= 0) { // Shortcut, just one call T location = locations.iterator().next(); String ns = location.getNameserviceId(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 739a2ffeb08..b934355dc9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Array; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -133,6 +134,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; @@ -294,7 +296,9 @@ public class RouterRpcServer extends AbstractService AccessControlException.class, LeaseExpiredException.class, NotReplicatedYetException.class, - IOException.class); + IOException.class, + ConnectException.class, + RetriableException.class); this.rpcServer.addSuppressedLoggingExceptions( StandbyException.class); @@ -520,7 +524,7 @@ public class RouterRpcServer extends AbstractService // If default Ns is not present return result from first namespace. Set nss = namenodeResolver.getNamespaces(); if (nss.isEmpty()) { - throw new IOException("No namespace availaible."); + throw new IOException("No namespace available."); } nsId = nss.iterator().next().getNameserviceId(); return rpcClient.invokeSingle(nsId, method, clazz); @@ -566,6 +570,7 @@ public class RouterRpcServer extends AbstractService replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); } + /** * Get the location to create a file. It checks if the file already existed * in one of the locations. @@ -574,10 +579,24 @@ public class RouterRpcServer extends AbstractService * @return The remote location for this file. * @throws IOException If the file has no creation location. */ - RemoteLocation getCreateLocation(final String src) + RemoteLocation getCreateLocation(final String src) throws IOException { + final List locations = getLocationsForPath(src, true); + return getCreateLocation(src, locations); + } + + /** + * Get the location to create a file. It checks if the file already existed + * in one of the locations. + * + * @param src Path of the file to check. + * @param locations Prefetched locations for the file. + * @return The remote location for this file. + * @throws IOException If the file has no creation location. + */ + RemoteLocation getCreateLocation( + final String src, final List locations) throws IOException { - final List locations = getLocationsForPath(src, true); if (locations == null || locations.isEmpty()) { throw new IOException("Cannot get locations to create " + src); } @@ -1568,6 +1587,27 @@ public class RouterRpcServer extends AbstractService return false; } + /** + * Check if a path supports failed subclusters. + * + * @param path Path to check. + * @return If a path should support failed subclusters. + */ + boolean isPathFaultTolerant(final String path) { + if (subclusterResolver instanceof MountTableResolver) { + try { + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + MountTable entry = mountTable.getMountPoint(path); + if (entry != null) { + return entry.isFaultTolerant(); + } + } catch (IOException e) { + LOG.error("Cannot get mount point", e); + } + } + return false; + } + /** * Check if call needs to be invoked to all the locations. The call is * supposed to be invoked in all the locations in case the order of the mount diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java index d5e1857a8c1..87610385d87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -66,6 +66,7 @@ public class MountTableStoreImpl extends MountTableStore { if (pc != null) { pc.checkPermission(mountTable, FsAction.WRITE); } + mountTable.validate(); } boolean status = getDriver().put(mountTable, false, true); @@ -85,6 +86,7 @@ public class MountTableStoreImpl extends MountTableStore { if (pc != null) { pc.checkPermission(mountTable, FsAction.WRITE); } + mountTable.validate(); } boolean status = getDriver().put(mountTable, true, true); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index c1585b06df5..d1351a340c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -59,6 +60,10 @@ public abstract class MountTable extends BaseRecord { "Invalid entry, invalid destination path "; public static final String ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH = "Invalid entry, all destination must start with / "; + private static final String ERROR_MSG_FAULT_TOLERANT_MULTI_DEST = + "Invalid entry, fault tolerance requires multiple destinations "; + private static final String ERROR_MSG_FAULT_TOLERANT_ALL = + "Invalid entry, fault tolerance only supported for ALL order "; /** Comparator for paths which considers the /. */ public static final Comparator PATH_COMPARATOR = @@ -228,6 +233,20 @@ public abstract class MountTable extends BaseRecord { */ public abstract void setDestOrder(DestinationOrder order); + /** + * Check if the mount point supports a failed destination. + * + * @return If it supports failures. + */ + public abstract boolean isFaultTolerant(); + + /** + * Set if the mount point supports failed destinations. + * + * @param faultTolerant If it supports failures. + */ + public abstract void setFaultTolerant(boolean faultTolerant); + /** * Get owner name of this mount table entry. * @@ -321,11 +340,14 @@ public abstract class MountTable extends BaseRecord { List destinations = this.getDestinations(); sb.append(destinations); if (destinations != null && destinations.size() > 1) { - sb.append("[" + this.getDestOrder() + "]"); + sb.append("[").append(this.getDestOrder()).append("]"); } if (this.isReadOnly()) { sb.append("[RO]"); } + if (this.isFaultTolerant()) { + sb.append("[FT]"); + } if (this.getOwnerName() != null) { sb.append("[owner:").append(this.getOwnerName()).append("]"); @@ -383,6 +405,16 @@ public abstract class MountTable extends BaseRecord { ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH + this); } } + if (isFaultTolerant()) { + if (getDestinations().size() < 2) { + throw new IllegalArgumentException( + ERROR_MSG_FAULT_TOLERANT_MULTI_DEST + this); + } + if (!isAll()) { + throw new IllegalArgumentException( + ERROR_MSG_FAULT_TOLERANT_ALL + this); + } + } } @Override @@ -397,6 +429,7 @@ public abstract class MountTable extends BaseRecord { .append(this.getDestinations()) .append(this.isReadOnly()) .append(this.getDestOrder()) + .append(this.isFaultTolerant()) .toHashCode(); } @@ -404,16 +437,13 @@ public abstract class MountTable extends BaseRecord { public boolean equals(Object obj) { if (obj instanceof MountTable) { MountTable other = (MountTable)obj; - if (!this.getSourcePath().equals(other.getSourcePath())) { - return false; - } else if (!this.getDestinations().equals(other.getDestinations())) { - return false; - } else if (this.isReadOnly() != other.isReadOnly()) { - return false; - } else if (!this.getDestOrder().equals(other.getDestOrder())) { - return false; - } - return true; + return new EqualsBuilder() + .append(this.getSourcePath(), other.getSourcePath()) + .append(this.getDestinations(), other.getDestinations()) + .append(this.isReadOnly(), other.isReadOnly()) + .append(this.getDestOrder(), other.getDestOrder()) + .append(this.isFaultTolerant(), other.isFaultTolerant()) + .isEquals(); } return false; } @@ -424,9 +454,7 @@ public abstract class MountTable extends BaseRecord { */ public boolean isAll() { DestinationOrder order = getDestOrder(); - return order == DestinationOrder.HASH_ALL || - order == DestinationOrder.RANDOM || - order == DestinationOrder.SPACE; + return DestinationOrder.FOLDER_ALL.contains(order); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java index 4c7622c0990..62cdc7272fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java @@ -195,6 +195,20 @@ public class MountTablePBImpl extends MountTable implements PBRecord { } } + @Override + public boolean isFaultTolerant() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasFaultTolerant()) { + return false; + } + return proto.getFaultTolerant(); + } + + @Override + public void setFaultTolerant(boolean faultTolerant) { + this.translator.getBuilder().setFaultTolerant(faultTolerant); + } + @Override public String getOwnerName() { MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index b04b0692b0a..61da7e926d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -135,12 +135,12 @@ public class RouterAdmin extends Configured implements Tool { } if (cmd.equals("-add")) { return "\t[-add " - + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]"; } else if (cmd.equals("-update")) { return "\t[-update " + " " - + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]"; } else if (cmd.equals("-rm")) { return "\t[-rm ]"; @@ -415,6 +415,7 @@ public class RouterAdmin extends Configured implements Tool { // Optional parameters boolean readOnly = false; + boolean faultTolerant = false; String owner = null; String group = null; FsPermission mode = null; @@ -422,6 +423,8 @@ public class RouterAdmin extends Configured implements Tool { while (i < parameters.length) { if (parameters[i].equals("-readonly")) { readOnly = true; + } else if (parameters[i].equals("-faulttolerant")) { + faultTolerant = true; } else if (parameters[i].equals("-order")) { i++; try { @@ -447,7 +450,7 @@ public class RouterAdmin extends Configured implements Tool { i++; } - return addMount(mount, nss, dest, readOnly, order, + return addMount(mount, nss, dest, readOnly, faultTolerant, order, new ACLEntity(owner, group, mode)); } @@ -464,7 +467,8 @@ public class RouterAdmin extends Configured implements Tool { * @throws IOException Error adding the mount point. */ public boolean addMount(String mount, String[] nss, String dest, - boolean readonly, DestinationOrder order, ACLEntity aclInfo) + boolean readonly, boolean faultTolerant, DestinationOrder order, + ACLEntity aclInfo) throws IOException { mount = normalizeFileSystemPath(mount); // Get the existing entry @@ -491,6 +495,9 @@ public class RouterAdmin extends Configured implements Tool { if (readonly) { newEntry.setReadOnly(true); } + if (faultTolerant) { + newEntry.setFaultTolerant(true); + } if (order != null) { newEntry.setDestOrder(order); } @@ -508,6 +515,8 @@ public class RouterAdmin extends Configured implements Tool { newEntry.setMode(aclInfo.getMode()); } + newEntry.validate(); + AddMountTableEntryRequest request = AddMountTableEntryRequest.newInstance(newEntry); AddMountTableEntryResponse addResponse = @@ -527,6 +536,9 @@ public class RouterAdmin extends Configured implements Tool { if (readonly) { existingEntry.setReadOnly(true); } + if (faultTolerant) { + existingEntry.setFaultTolerant(true); + } if (order != null) { existingEntry.setDestOrder(order); } @@ -544,6 +556,8 @@ public class RouterAdmin extends Configured implements Tool { existingEntry.setMode(aclInfo.getMode()); } + existingEntry.validate(); + UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest.newInstance(existingEntry); UpdateMountTableEntryResponse updateResponse = @@ -572,6 +586,7 @@ public class RouterAdmin extends Configured implements Tool { // Optional parameters boolean readOnly = false; + boolean faultTolerant = false; String owner = null; String group = null; FsPermission mode = null; @@ -579,6 +594,8 @@ public class RouterAdmin extends Configured implements Tool { while (i < parameters.length) { if (parameters[i].equals("-readonly")) { readOnly = true; + } else if (parameters[i].equals("-faulttolerant")) { + faultTolerant = true; } else if (parameters[i].equals("-order")) { i++; try { @@ -604,7 +621,7 @@ public class RouterAdmin extends Configured implements Tool { i++; } - return updateMount(mount, nss, dest, readOnly, order, + return updateMount(mount, nss, dest, readOnly, faultTolerant, order, new ACLEntity(owner, group, mode)); } @@ -621,7 +638,8 @@ public class RouterAdmin extends Configured implements Tool { * @throws IOException Error updating the mount point. */ public boolean updateMount(String mount, String[] nss, String dest, - boolean readonly, DestinationOrder order, ACLEntity aclInfo) + boolean readonly, boolean faultTolerant, + DestinationOrder order, ACLEntity aclInfo) throws IOException { mount = normalizeFileSystemPath(mount); MountTableManager mountTable = client.getMountTableManager(); @@ -634,6 +652,7 @@ public class RouterAdmin extends Configured implements Tool { MountTable newEntry = MountTable.newInstance(mount, destMap); newEntry.setReadOnly(readonly); + newEntry.setFaultTolerant(faultTolerant); if (order != null) { newEntry.setDestOrder(order); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index a55be731a74..6a60e4ad0d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -143,6 +143,8 @@ message MountTableRecordProto { optional int32 mode = 12; optional QuotaUsageProto quota = 13; + + optional bool faultTolerant = 14 [default = false]; } message AddMountTableEntryRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 1034c87ff8f..e23f863e8a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -503,6 +503,36 @@ + + dfs.federation.router.client.mount-status.time-out + 1s + + Set a timeout for the Router when listing folders containing mount + points. In this process, the Router checks the mount table and then it + checks permissions in the subcluster. After the time out, we return the + default values. + + + + + dfs.federation.router.connect.max.retries.on.timeouts + 0 + + Maximum number of retries for the IPC Client when connecting to the + subclusters. By default, it doesn't let the IPC retry and the Router + handles it. + + + + + dfs.federation.router.connect.timeout + 2s + + Time out for the IPC client connecting to the subclusters. This should be + short as the Router has knowledge of the state of the Routers. + + + dfs.federation.router.keytab.file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html index c591698e4b6..cf8653bc8f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html @@ -393,6 +393,7 @@ Target path Order Read only + Fault tolerant Owner Group Permission @@ -409,6 +410,7 @@ {path} {order} + {ownerName} {groupName} {mode} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js index 5da7b079ffe..e655e604d8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js @@ -324,8 +324,20 @@ } } + function augment_fault_tolerant(mountTable) { + for (var i = 0, e = mountTable.length; i < e; ++i) { + if (mountTable[i].faulttolerant == true) { + mountTable[i].faulttolerant = "true" + mountTable[i].ftStatus = "Fault tolerant" + } else { + mountTable[i].faulttolerant = "false" + } + } + } + resource.MountTable = JSON.parse(resource.MountTable) augment_read_only(resource.MountTable) + augment_fault_tolerant(resource.MountTable) return resource; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css index 5cdd8269ca1..b2eef6ad7e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css @@ -135,3 +135,8 @@ color: #5fa341; content: "\e033"; } + +.mount-table-fault-tolerant-true:before { + color: #5fa341; + content: "\e033"; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index f24ff12993f..83cecda53d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -266,6 +266,14 @@ To determine which subcluster contains a file: [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt Note that consistency of the data across subclusters is not guaranteed by the Router. +By default, if one subcluster is unavailable, writes may fail if they target that subcluster. +To allow writing in another subcluster, one can make the mount point fault tolerant: + + [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data ns1,ns2 /data -order HASH_ALL -faulttolerant + +Note that this can lead to a file to be written in multiple subclusters or a folder missing in one. +One needs to be aware of the possibility of these inconsistencies and target this `faulttolerant` approach to resilient paths. +An example for this is the `/app-logs` folder which will mostly write once into a subfolder. ### Disabling nameservices diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java index 9b58fff085c..d8dffeedd18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -18,19 +18,44 @@ package org.apache.hadoop.hdfs.server.federation; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.ConnectException; import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; @@ -40,15 +65,29 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DataChecksum.Type; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.BlockingService; @@ -59,9 +98,15 @@ import com.google.protobuf.BlockingService; */ public class MockNamenode { + private static final Logger LOG = + LoggerFactory.getLogger(MockNamenode.class); + + /** Mock implementation of the Namenode. */ private final NamenodeProtocols mockNn; + /** Name service identifier (subcluster). */ + private String nsId; /** HA state of the Namenode. */ private HAServiceState haState = HAServiceState.STANDBY; @@ -71,9 +116,13 @@ public class MockNamenode { private HttpServer2 httpServer; - public MockNamenode() throws Exception { - Configuration conf = new Configuration(); + public MockNamenode(final String nsIdentifier) throws IOException { + this(nsIdentifier, new HdfsConfiguration()); + } + public MockNamenode(final String nsIdentifier, final Configuration conf) + throws IOException { + this.nsId = nsIdentifier; this.mockNn = mock(NamenodeProtocols.class); setupMock(); setupRPCServer(conf); @@ -86,7 +135,7 @@ public class MockNamenode { * @throws IOException If the mock cannot be setup. */ protected void setupMock() throws IOException { - NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1); + NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1); when(mockNn.versionRequest()).thenReturn(nsInfo); when(mockNn.getServiceStatus()).thenAnswer(new Answer() { @@ -115,11 +164,16 @@ public class MockNamenode { ClientNamenodeProtocol.newReflectiveBlockingService( clientNNProtoXlator); + int numHandlers = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, + DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT); + rpcServer = new RPC.Builder(conf) .setProtocol(ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress("0.0.0.0") .setPort(0) + .setNumHandlers(numHandlers) .build(); NamenodeProtocolServerSideTranslatorPB nnProtoXlator = @@ -146,6 +200,18 @@ public class MockNamenode { DFSUtil.addPBProtocol( conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer); + this.rpcServer.addTerseExceptions( + RemoteException.class, + SafeModeException.class, + FileNotFoundException.class, + FileAlreadyExistsException.class, + AccessControlException.class, + LeaseExpiredException.class, + NotReplicatedYetException.class, + IOException.class, + ConnectException.class, + StandbyException.class); + rpcServer.start(); } @@ -188,6 +254,14 @@ public class MockNamenode { return mockNn; } + /** + * Get the name service id (subcluster) of the Mock Namenode. + * @return Name service identifier. + */ + public String getNameserviceId() { + return nsId; + } + /** * Get the HA state of the Mock Namenode. * @return HA state (ACTIVE or STANDBY). @@ -217,9 +291,158 @@ public class MockNamenode { public void stop() throws Exception { if (rpcServer != null) { rpcServer.stop(); + rpcServer = null; } if (httpServer != null) { httpServer.stop(); + httpServer = null; } } + + /** + * Add the mock for the FileSystem calls in ClientProtocol. + * @throws IOException If it cannot be setup. + */ + public void addFileSystemMock() throws IOException { + final SortedMap fs = + new ConcurrentSkipListMap(); + + DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean()); + when(l).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} getListing({})", nsId, src); + if (!src.endsWith("/")) { + src += "/"; + } + Map files = + fs.subMap(src, src + Character.MAX_VALUE); + List list = new ArrayList<>(); + for (String file : files.keySet()) { + if (file.substring(src.length()).indexOf('/') < 0) { + HdfsFileStatus fileStatus = + getMockHdfsFileStatus(file, fs.get(file)); + list.add(fileStatus); + } + } + HdfsFileStatus[] array = list.toArray( + new HdfsFileStatus[list.size()]); + return new DirectoryListing(array, 0); + }); + when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} getFileInfo({})", nsId, src); + return getMockHdfsFileStatus(src, fs.get(src)); + }); + HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(), + anyBoolean(), anyShort(), anyLong(), any(), any(), any()); + when(c).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} create({})", nsId, src); + fs.put(src, "FILE"); + return getMockHdfsFileStatus(src, "FILE"); + }); + LocatedBlocks b = mockNn.getBlockLocations( + anyString(), anyLong(), anyLong()); + when(b).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} getBlockLocations({})", nsId, src); + if (!fs.containsKey(src)) { + LOG.error("{} cannot find {} for getBlockLocations", nsId, src); + throw new FileNotFoundException("File does not exist " + src); + } + return mock(LocatedBlocks.class); + }); + boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong()); + when(f).thenAnswer(invocation -> { + String src = getSrc(invocation); + if (!fs.containsKey(src)) { + LOG.error("{} cannot find {} for complete", nsId, src); + throw new FileNotFoundException("File does not exist " + src); + } + return true; + }); + LocatedBlock a = mockNn.addBlock( + anyString(), anyString(), any(), any(), anyLong(), any(), any()); + when(a).thenAnswer(invocation -> { + String src = getSrc(invocation); + if (!fs.containsKey(src)) { + LOG.error("{} cannot find {} for addBlock", nsId, src); + throw new FileNotFoundException("File does not exist " + src); + } + return getMockLocatedBlock(nsId); + }); + boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean()); + when(m).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} mkdirs({})", nsId, src); + fs.put(src, "DIRECTORY"); + return true; + }); + when(mockNn.getServerDefaults()).thenAnswer(invocation -> { + LOG.info("{} getServerDefaults", nsId); + FsServerDefaults defaults = mock(FsServerDefaults.class); + when(defaults.getChecksumType()).thenReturn( + Type.valueOf(DataChecksum.CHECKSUM_CRC32)); + when(defaults.getKeyProviderUri()).thenReturn(nsId); + return defaults; + }); + } + + private static String getSrc(InvocationOnMock invocation) { + return (String) invocation.getArguments()[0]; + } + + /** + * Get a mock HDFS file status. + * @param filename Name of the file. + * @param type Type of the file (FILE, DIRECTORY, or null). + * @return HDFS file status + */ + private static HdfsFileStatus getMockHdfsFileStatus( + final String filename, final String type) { + if (type == null) { + return null; + } + HdfsFileStatus fileStatus = mock(HdfsFileStatus.class); + when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes()); + when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class)); + when(fileStatus.getOwner()).thenReturn("owner"); + when(fileStatus.getGroup()).thenReturn("group"); + if (type.equals("FILE")) { + when(fileStatus.getLen()).thenReturn(100L); + when(fileStatus.getReplication()).thenReturn((short) 1); + when(fileStatus.getBlockSize()).thenReturn( + HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + } else if (type.equals("DIRECTORY")) { + when(fileStatus.isDir()).thenReturn(true); + when(fileStatus.isDirectory()).thenReturn(true); + } + return fileStatus; + } + + /** + * Get a mock located block pointing to one of the subclusters. It is + * allocated in a fake Datanode. + * @param nsId Name service identifier (subcluster). + * @return Mock located block. + */ + private static LocatedBlock getMockLocatedBlock(final String nsId) { + LocatedBlock lb = mock(LocatedBlock.class); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]); + DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", + 1111, 1112, 1113, 1114); + DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); + when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + ExtendedBlock eb = mock(ExtendedBlock.class); + when(eb.getBlockPoolId()).thenReturn(nsId); + when(lb.getBlock()).thenReturn(eb); + @SuppressWarnings("unchecked") + Token tok = mock(Token.class); + when(tok.getIdentifier()).thenReturn(nsId.getBytes()); + when(tok.getPassword()).thenReturn(nsId.getBytes()); + when(tok.getKind()).thenReturn(new Text(nsId)); + when(tok.getService()).thenReturn(new Text(nsId)); + when(lb.getBlockToken()).thenReturn(tok); + return lb; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 486d4a09b70..381203b2a91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -152,7 +152,7 @@ public class TestRouterAdminCLI { @Test public void testAddMountTable() throws Exception { - String nsId = "ns0"; + String nsId = "ns0,ns1"; String src = "/test-addmounttable"; String dest = "/addmounttable"; String[] argv = new String[] {"-add", src, nsId, dest}; @@ -166,26 +166,35 @@ public class TestRouterAdminCLI { MountTable mountTable = getResponse.getEntries().get(0); List destinations = mountTable.getDestinations(); - assertEquals(1, destinations.size()); + assertEquals(2, destinations.size()); assertEquals(src, mountTable.getSourcePath()); - assertEquals(nsId, destinations.get(0).getNameserviceId()); + assertEquals("ns0", destinations.get(0).getNameserviceId()); assertEquals(dest, destinations.get(0).getDest()); + assertEquals("ns1", destinations.get(1).getNameserviceId()); + assertEquals(dest, destinations.get(1).getDest()); assertFalse(mountTable.isReadOnly()); + assertFalse(mountTable.isFaultTolerant()); // test mount table update behavior dest = dest + "-new"; - argv = new String[] {"-add", src, nsId, dest, "-readonly"}; + argv = new String[] {"-add", src, nsId, dest, "-readonly", + "-faulttolerant", "-order", "HASH_ALL"}; assertEquals(0, ToolRunner.run(admin, argv)); stateStore.loadCache(MountTableStoreImpl.class, true); getResponse = client.getMountTableManager() .getMountTableEntries(getRequest); mountTable = getResponse.getEntries().get(0); - assertEquals(2, mountTable.getDestinations().size()); - assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId()); - assertEquals(dest, mountTable.getDestinations().get(1).getDest()); + assertEquals(4, mountTable.getDestinations().size()); + RemoteLocation loc2 = mountTable.getDestinations().get(2); + assertEquals("ns0", loc2.getNameserviceId()); + assertEquals(dest, loc2.getDest()); + RemoteLocation loc3 = mountTable.getDestinations().get(3); + assertEquals("ns1", loc3.getNameserviceId()); + assertEquals(dest, loc3.getDest()); assertTrue(mountTable.isReadOnly()); + assertTrue(mountTable.isFaultTolerant()); } @Test @@ -211,6 +220,7 @@ public class TestRouterAdminCLI { assertEquals(nsId, destinations.get(0).getNameserviceId()); assertEquals(dest, destinations.get(0).getDest()); assertFalse(mountTable.isReadOnly()); + assertFalse(mountTable.isFaultTolerant()); // test mount table update behavior dest = dest + "-new"; @@ -516,17 +526,19 @@ public class TestRouterAdminCLI { System.setOut(new PrintStream(out)); String[] argv = new String[] {"-add", src, nsId}; assertEquals(-1, ToolRunner.run(admin, argv)); - assertTrue(out.toString().contains( + assertTrue("Wrong message: " + out, out.toString().contains( "\t[-add " - + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "[-readonly] [-faulttolerant] " + + "[-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]")); out.reset(); argv = new String[] {"-update", src, nsId}; assertEquals(-1, ToolRunner.run(admin, argv)); - assertTrue(out.toString().contains( + assertTrue("Wrong message: " + out, out.toString().contains( "\t[-update " - + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "[-readonly] [-faulttolerant] " + + "[-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]")); out.reset(); @@ -567,10 +579,11 @@ public class TestRouterAdminCLI { assertEquals(-1, ToolRunner.run(admin, argv)); String expected = "Usage: hdfs dfsrouteradmin :\n" + "\t[-add " - + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]\n" + "\t[-update " - + " " + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + " " + + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + "-owner -group -mode ]\n" + "\t[-rm ]\n" + "\t[-ls ]\n" + "\t[-getDestination ]\n" @@ -579,7 +592,7 @@ public class TestRouterAdminCLI { + "\t[-safemode enter | leave | get]\n" + "\t[-nameservice enable | disable ]\n" + "\t[-getDisabledNameservices]"; - assertTrue(out.toString(), out.toString().contains(expected)); + assertTrue("Wrong message: " + out, out.toString().contains(expected)); out.reset(); } @@ -1159,4 +1172,28 @@ public class TestRouterAdminCLI { argv = new String[] {"-getDestination /file1.txt /file2.txt"}; assertEquals(-1, ToolRunner.run(admin, argv)); } + + @Test + public void testErrorFaultTolerant() throws Exception { + + System.setErr(new PrintStream(err)); + String[] argv = new String[] {"-add", "/mntft", "ns01", "/tmp", + "-faulttolerant"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + assertTrue(err.toString(), err.toString().contains( + "Invalid entry, fault tolerance requires multiple destinations")); + err.reset(); + + System.setErr(new PrintStream(err)); + argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp", + "-order", "HASH", "-faulttolerant"}; + assertEquals(-1, ToolRunner.run(admin, argv)); + assertTrue(err.toString(), err.toString().contains( + "Invalid entry, fault tolerance only supported for ALL order")); + err.reset(); + + argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp", + "-order", "HASH_ALL", "-faulttolerant"}; + assertEquals(0, ToolRunner.run(admin, argv)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java new file mode 100644 index 00000000000..c8f96c659cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static java.util.Arrays.asList; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.MockNamenode; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test the handling of fault tolerant mount points in the Router. + */ +public class TestRouterFaultTolerant { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterFaultTolerant.class); + + /** Number of files to create for testing. */ + private static final int NUM_FILES = 10; + /** Number of Routers for test. */ + private static final int NUM_ROUTERS = 2; + + + /** Namenodes for the test per name service id (subcluster). */ + private Map namenodes = new HashMap<>(); + /** Routers for the test. */ + private List routers = new ArrayList<>(); + + /** Run test tasks in parallel. */ + private ExecutorService service; + + + @Before + public void setup() throws Exception { + LOG.info("Start the Namenodes"); + Configuration nnConf = new HdfsConfiguration(); + nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10); + for (final String nsId : asList("ns0", "ns1")) { + MockNamenode nn = new MockNamenode(nsId, nnConf); + nn.transitionToActive(); + nn.addFileSystemMock(); + namenodes.put(nsId, nn); + } + + LOG.info("Start the Routers"); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0"); + // Speedup time outs + routerConf.setTimeDuration( + RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT, + 500, TimeUnit.MILLISECONDS); + + Configuration stateStoreConf = getStateStoreConfiguration(); + stateStoreConf.setClass( + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MembershipNamenodeResolver.class, ActiveNamenodeResolver.class); + stateStoreConf.setClass( + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MultipleDestinationMountTableResolver.class, + FileSubclusterResolver.class); + routerConf.addResource(stateStoreConf); + + for (int i = 0; i < NUM_ROUTERS; i++) { + // router0 doesn't allow partial listing + routerConf.setBoolean( + RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, i != 0); + + final Router router = new Router(); + router.init(routerConf); + router.start(); + routers.add(router); + } + + LOG.info("Registering the subclusters in the Routers"); + registerSubclusters(Collections.singleton("ns1")); + + LOG.info("Stop ns1 to simulate an unavailable subcluster"); + namenodes.get("ns1").stop(); + + service = Executors.newFixedThreadPool(10); + } + + /** + * Register the subclusters in all Routers. + * @param unavailableSubclusters Set of unavailable subclusters. + * @throws IOException If it cannot register a subcluster. + */ + private void registerSubclusters(Set unavailableSubclusters) + throws IOException { + for (final Router router : routers) { + MembershipNamenodeResolver resolver = + (MembershipNamenodeResolver) router.getNamenodeResolver(); + for (final MockNamenode nn : namenodes.values()) { + String nsId = nn.getNameserviceId(); + String rpcAddress = "localhost:" + nn.getRPCPort(); + String httpAddress = "localhost:" + nn.getHTTPPort(); + NamenodeStatusReport report = new NamenodeStatusReport( + nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress); + if (unavailableSubclusters.contains(nsId)) { + LOG.info("Register {} as UNAVAILABLE", nsId); + report.setRegistrationValid(false); + } else { + LOG.info("Register {} as ACTIVE", nsId); + report.setRegistrationValid(true); + } + report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0)); + resolver.registerNamenode(report); + } + resolver.loadCache(true); + } + } + + @After + public void cleanup() throws Exception { + LOG.info("Stopping the cluster"); + for (final MockNamenode nn : namenodes.values()) { + nn.stop(); + } + namenodes.clear(); + + routers.forEach(router -> router.stop()); + routers.clear(); + + if (service != null) { + service.shutdown(); + service = null; + } + } + + /** + * Add a mount table entry in some name services and wait until it is + * available. + * @param mountPoint Name of the mount point. + * @param order Order of the mount table entry. + * @param nsIds Name service identifiers. + * @throws Exception If the entry could not be created. + */ + private void createMountTableEntry( + final String mountPoint, final DestinationOrder order, + Collection nsIds) throws Exception { + Router router = getRandomRouter(); + RouterClient admin = getAdminClient(router); + MountTableManager mountTable = admin.getMountTableManager(); + Map destMap = new HashMap<>(); + for (String nsId : nsIds) { + destMap.put(nsId, mountPoint); + } + MountTable newEntry = MountTable.newInstance(mountPoint, destMap); + newEntry.setDestOrder(order); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + boolean created = addResponse.getStatus(); + assertTrue(created); + + refreshRoutersCaches(); + + // Check for the path + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mountPoint); + GetMountTableEntriesResponse getResponse = + mountTable.getMountTableEntries(getRequest); + List entries = getResponse.getEntries(); + assertEquals("Too many entries: " + entries, 1, entries.size()); + assertEquals(mountPoint, entries.get(0).getSourcePath()); + } + + /** + * Update a mount table entry to be fault tolerant. + * @param mountPoint Mount point to update. + * @throws IOException If it cannot update the mount point. + */ + private void updateMountPointFaultTolerant(final String mountPoint) + throws IOException { + Router router = getRandomRouter(); + RouterClient admin = getAdminClient(router); + MountTableManager mountTable = admin.getMountTableManager(); + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mountPoint); + GetMountTableEntriesResponse entries = + mountTable.getMountTableEntries(getRequest); + MountTable updateEntry = entries.getEntries().get(0); + updateEntry.setFaultTolerant(true); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(updateEntry); + UpdateMountTableEntryResponse updateResponse = + mountTable.updateMountTableEntry(updateRequest); + assertTrue(updateResponse.getStatus()); + + refreshRoutersCaches(); + } + + /** + * Refresh the caches of all Routers (to get the mount table). + */ + private void refreshRoutersCaches() { + for (final Router router : routers) { + StateStoreService stateStore = router.getStateStore(); + stateStore.refreshCaches(true); + } + } + + /** + * Test the behavior of the Router when one of the subclusters in a mount + * point fails. In particular, it checks if it can write files or not. + * Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}. + */ + @Test + public void testWriteWithFailedSubcluster() throws Exception { + + // Run the actual tests with each approach + final List> tasks = new ArrayList<>(); + final List orders = asList( + DestinationOrder.HASH_ALL, + DestinationOrder.SPACE, + DestinationOrder.RANDOM, + DestinationOrder.HASH); + for (DestinationOrder order : orders) { + tasks.add(() -> { + testWriteWithFailedSubcluster(order); + return true; + }); + } + TaskResults results = collectResults("Full tests", tasks); + assertEquals(orders.size(), results.getSuccess()); + } + + /** + * Test the behavior of the Router when one of the subclusters in a mount + * point fails. It assumes that ns1 is already down. + * @param order Destination order of the mount point. + * @throws Exception If we cannot run the test. + */ + private void testWriteWithFailedSubcluster(final DestinationOrder order) + throws Exception { + + final FileSystem router0Fs = getFileSystem(routers.get(0)); + final FileSystem router1Fs = getFileSystem(routers.get(1)); + final FileSystem ns0Fs = getFileSystem(namenodes.get("ns0").getRPCPort()); + + final String mountPoint = "/" + order + "-failsubcluster"; + final Path mountPath = new Path(mountPoint); + LOG.info("Setup {} with order {}", mountPoint, order); + createMountTableEntry(mountPoint, order, namenodes.keySet()); + + + LOG.info("Write in {} should succeed writing in ns0 and fail for ns1", + mountPath); + checkDirectoriesFaultTolerant( + mountPath, order, router0Fs, router1Fs, ns0Fs, false); + checkFilesFaultTolerant( + mountPath, order, router0Fs, router1Fs, ns0Fs, false); + + LOG.info("Make {} fault tolerant and everything succeeds", mountPath); + IOException ioe = null; + try { + updateMountPointFaultTolerant(mountPoint); + } catch (IOException e) { + ioe = e; + } + if (DestinationOrder.FOLDER_ALL.contains(order)) { + assertNull(ioe); + checkDirectoriesFaultTolerant( + mountPath, order, router0Fs, router1Fs, ns0Fs, true); + checkFilesFaultTolerant( + mountPath, order, router0Fs, router1Fs, ns0Fs, true); + } else { + assertTrue(ioe.getMessage().startsWith( + "Invalid entry, fault tolerance only supported for ALL order")); + } + } + + /** + * Check directory creation on a mount point. + * If it is fault tolerant, it should be able to write everything. + * If it is not fault tolerant, it should fail to write some. + */ + private void checkDirectoriesFaultTolerant( + Path mountPoint, DestinationOrder order, + FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs, + boolean faultTolerant) throws Exception { + + final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint); + + LOG.info("Create directories in {}", mountPoint); + final List> tasks = new ArrayList<>(); + for (int i = 0; i < NUM_FILES; i++) { + final Path dir = new Path(mountPoint, + String.format("dir-%s-%03d", faultTolerant, i)); + FileSystem fs = getRandomRouterFileSystem(); + tasks.add(getDirCreateTask(fs, dir)); + } + TaskResults results = collectResults("Create dir " + mountPoint, tasks); + + LOG.info("Check directories results for {}: {}", mountPoint, results); + if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) { + assertEquals(NUM_FILES, results.getSuccess()); + assertEquals(0, results.getFailure()); + } else { + assertBothResults("check dir " + mountPoint, NUM_FILES, results); + } + + LOG.info("Check directories listing for {}", mountPoint); + tasks.add(getListFailTask(router0Fs, mountPoint)); + int filesExpected = dirs0.length + results.getSuccess(); + tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected)); + assertEquals(2, collectResults("List " + mountPoint, tasks).getSuccess()); + } + + /** + * Check file creation on a mount point. + * If it is fault tolerant, it should be able to write everything. + * If it is not fault tolerant, it should fail to write some of the files. + */ + private void checkFilesFaultTolerant( + Path mountPoint, DestinationOrder order, + FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs, + boolean faultTolerant) throws Exception { + + // Get one of the existing sub directories + final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint); + final Path dir0 = Path.getPathWithoutSchemeAndAuthority( + dirs0[0].getPath()); + + LOG.info("Create files in {}", dir0); + final List> tasks = new ArrayList<>(); + for (int i = 0; i < NUM_FILES; i++) { + final String newFile = String.format("%s/file-%03d.txt", dir0, i); + FileSystem fs = getRandomRouterFileSystem(); + tasks.add(getFileCreateTask(fs, newFile, ns0Fs)); + } + TaskResults results = collectResults("Create file " + dir0, tasks); + + LOG.info("Check files results for {}: {}", dir0, results); + if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) { + assertEquals(NUM_FILES, results.getSuccess()); + assertEquals(0, results.getFailure()); + } else { + assertBothResults("check files " + dir0, NUM_FILES, results); + } + + LOG.info("Check files listing for {}", dir0); + tasks.add(getListFailTask(router0Fs, dir0)); + tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess())); + assertEquals(2, collectResults("List " + dir0, tasks).getSuccess()); + } + + /** + * Get the string representation for the files. + * @param files Files to check. + * @return String representation. + */ + private static String toString(final FileStatus[] files) { + final StringBuilder sb = new StringBuilder(); + sb.append("["); + for (final FileStatus file : files) { + if (sb.length() > 1) { + sb.append(", "); + } + sb.append(Path.getPathWithoutSchemeAndAuthority(file.getPath())); + } + sb.append("]"); + return sb.toString(); + } + + /** + * List the files in a path. + * @param fs File system to check. + * @param path Path to list. + * @return List of files. + * @throws IOException If we cannot list. + */ + private FileStatus[] listStatus(final FileSystem fs, final Path path) + throws IOException { + FileStatus[] files = new FileStatus[] {}; + try { + files = fs.listStatus(path); + } catch (FileNotFoundException fnfe) { + LOG.debug("File not found: {}", fnfe.getMessage()); + } + return files; + } + + /** + * Task that creates a file and checks if it is available. + * @param file File to create. + * @param checkFs File system for checking if the file is properly created. + * @return Result of creating the file. + */ + private static Callable getFileCreateTask( + final FileSystem fs, final String file, FileSystem checkFs) { + return () -> { + try { + Path path = new Path(file); + FSDataOutputStream os = fs.create(path); + // We don't write because we have no mock Datanodes + os.close(); + FileStatus fileStatus = checkFs.getFileStatus(path); + assertTrue("File not created properly: " + fileStatus, + fileStatus.getLen() > 0); + return true; + } catch (RemoteException re) { + return false; + } + }; + } + + /** + * Task that creates a directory. + * @param dir Directory to create. + * @return Result of creating the directory.. + */ + private static Callable getDirCreateTask( + final FileSystem fs, final Path dir) { + return () -> { + try { + fs.mkdirs(dir); + return true; + } catch (RemoteException re) { + return false; + } + }; + } + + /** + * Task that lists a directory and expects to fail. + * @param fs File system to check. + * @param path Path to try to list. + * @return If the listing failed as expected. + */ + private static Callable getListFailTask(FileSystem fs, Path path) { + return () -> { + try { + fs.listStatus(path); + return false; + } catch (RemoteException re) { + return true; + } + }; + } + + /** + * Task that lists a directory and succeeds. + * @param fs File system to check. + * @param path Path to list. + * @param expected Number of files to expect to find. + * @return If the listing succeeds. + */ + private static Callable getListSuccessTask( + FileSystem fs, Path path, int expected) { + return () -> { + final FileStatus[] dirs = fs.listStatus(path); + assertEquals(toString(dirs), expected, dirs.length); + return true; + }; + } + + /** + * Invoke a set of tasks and collect their outputs. + * The tasks should do assertions. + * + * @param service Execution Service to run the tasks. + * @param tasks Tasks to run. + * @throws Exception If it cannot collect the results. + */ + private TaskResults collectResults(final String tag, + final Collection> tasks) throws Exception { + final TaskResults results = new TaskResults(); + service.invokeAll(tasks).forEach(task -> { + try { + boolean succeeded = task.get(); + if (succeeded) { + LOG.info("Got success for {}", tag); + results.incrSuccess(); + } else { + LOG.info("Got failure for {}", tag); + results.incrFailure(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }); + tasks.clear(); + return results; + } + + /** + * Class to summarize the results of running a task. + */ + static class TaskResults { + private final AtomicInteger success = new AtomicInteger(0); + private final AtomicInteger failure = new AtomicInteger(0); + public void incrSuccess() { + success.incrementAndGet(); + } + public void incrFailure() { + failure.incrementAndGet(); + } + public int getSuccess() { + return success.get(); + } + public int getFailure() { + return failure.get(); + } + public int getTotal() { + return success.get() + failure.get(); + } + @Override + public String toString() { + return new StringBuilder() + .append("Success=").append(getSuccess()) + .append(" Failure=").append(getFailure()) + .toString(); + } + } + + /** + * Asserts that the results are the expected amount and it has both success + * and failure. + * @param msg Message to show when the assertion fails. + * @param expected Expected number of results. + * @param actual Actual results. + */ + private static void assertBothResults(String msg, + int expected, TaskResults actual) { + assertEquals(msg, expected, actual.getTotal()); + assertTrue("Expected some success for " + msg, actual.getSuccess() > 0); + assertTrue("Expected some failure for " + msg, actual.getFailure() > 0); + } + + /** + * Get a random Router from the cluster. + * @return Random Router. + */ + private Router getRandomRouter() { + Random rnd = new Random(); + int index = rnd.nextInt(routers.size()); + return routers.get(index); + } + + /** + * Get a file system from one of the Routers as a random user to allow better + * concurrency in the Router. + * @return File system from a random user. + * @throws Exception If we cannot create the file system. + */ + private FileSystem getRandomRouterFileSystem() throws Exception { + final UserGroupInformation userUgi = + UserGroupInformation.createUserForTesting( + "user-" + UUID.randomUUID(), new String[]{"group"}); + Router router = getRandomRouter(); + return userUgi.doAs( + (PrivilegedExceptionAction) () -> getFileSystem(router)); + } + + private static FileSystem getFileSystem(int rpcPort) throws IOException { + Configuration conf = new HdfsConfiguration(); + URI uri = URI.create("hdfs://localhost:" + rpcPort); + return DistributedFileSystem.get(uri, conf); + } + + private static FileSystem getFileSystem(final Router router) + throws IOException { + InetSocketAddress rpcAddress = router.getRpcServerAddress(); + int rpcPort = rpcAddress.getPort(); + return getFileSystem(rpcPort); + } + + private static RouterClient getAdminClient( + final Router router) throws IOException { + Configuration conf = new HdfsConfiguration(); + InetSocketAddress routerSocket = router.getAdminServerAddress(); + return new RouterClient(routerSocket, conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 1224fa2ddcd..8fa3506f73c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -74,7 +74,7 @@ public class TestRouterNamenodeMonitoring { for (String nsId : nsIds) { nns.put(nsId, new HashMap<>()); for (String nnId : asList("nn0", "nn1")) { - nns.get(nsId).put(nnId, new MockNamenode()); + nns.get(nsId).put(nnId, new MockNamenode(nsId)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java index 2ec5d62fd5e..98f9ebcf71a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; /** @@ -93,7 +94,7 @@ public final class FederationStateStoreTestUtils { conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); - if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { + if (StateStoreFileBaseImpl.class.isAssignableFrom(clazz)) { setFileConfiguration(conf); } return conf; @@ -178,8 +179,7 @@ public final class FederationStateStoreTestUtils { * @param conf Configuration to extend. */ public static void setFileConfiguration(Configuration conf) { - String workingPath = System.getProperty("user.dir"); - String stateStorePath = workingPath + "/statestore"; + String stateStorePath = GenericTestUtils.getRandomizedTempPath(); conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java index d30d6baea44..6e5bd9ca85f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java @@ -78,15 +78,17 @@ public class TestStateStoreMountTable extends TestStateStoreBase { assertFalse(getStateStore().isDriverReady()); // Test APIs that access the store to check they throw the correct exception + MountTable entry = MountTable.newInstance( + "/mnt", Collections.singletonMap("ns0", "/tmp")); AddMountTableEntryRequest addRequest = - AddMountTableEntryRequest.newInstance(); + AddMountTableEntryRequest.newInstance(entry); verifyException(mountStore, "addMountTableEntry", StateStoreUnavailableException.class, new Class[] {AddMountTableEntryRequest.class}, new Object[] {addRequest}); UpdateMountTableEntryRequest updateRequest = - UpdateMountTableEntryRequest.newInstance(); + UpdateMountTableEntryRequest.newInstance(entry); verifyException(mountStore, "updateMountTableEntry", StateStoreUnavailableException.class, new Class[] {UpdateMountTableEntryRequest.class}, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java index 055527384eb..339a9776ea4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -164,6 +165,24 @@ public class TestMountTable { assertTrue(record2.isReadOnly()); } + @Test + public void testFaultTolerant() throws IOException { + + Map dest = new LinkedHashMap<>(); + dest.put(DST_NS_0, DST_PATH_0); + dest.put(DST_NS_1, DST_PATH_1); + MountTable record0 = MountTable.newInstance(SRC, dest); + assertFalse(record0.isFaultTolerant()); + + MountTable record1 = MountTable.newInstance(SRC, dest); + assertFalse(record1.isFaultTolerant()); + assertEquals(record0, record1); + + record1.setFaultTolerant(true); + assertTrue(record1.isFaultTolerant()); + assertNotEquals(record0, record1); + } + @Test public void testOrder() throws IOException { testOrder(DestinationOrder.HASH); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 7ae31c83985..452b2773698 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -428,8 +428,8 @@ Runs the DFS router. See [Router](../hadoop-hdfs-rbf/HDFSRouterFederation.html#R Usage: hdfs dfsrouteradmin - [-add [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner -group -mode ] - [-update [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner -group -mode ] + [-add [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner -group -mode ] + [-update [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner -group -mode ] [-rm ] [-ls ] [-getDestination ]