HDFS-13250. RBF: Router to manage requests across multiple subclusters. Contributed by Inigo Goiri.
(cherry picked from commit2caba999bb
) (cherry picked from commit0db4b587c6
)
This commit is contained in:
parent
94b826d2c7
commit
d7d35fd4f0
|
@ -768,6 +768,66 @@ public class RouterRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke method in all locations and return success if any succeeds.
|
||||||
|
*
|
||||||
|
* @param locations List of remote locations to call concurrently.
|
||||||
|
* @param method The remote method and parameters to invoke.
|
||||||
|
* @return If the call succeeds in any location.
|
||||||
|
* @throws IOException If any of the calls return an exception.
|
||||||
|
*/
|
||||||
|
public <T extends RemoteLocationContext, R> boolean invokeAll(
|
||||||
|
final Collection<T> locations, final RemoteMethod method)
|
||||||
|
throws IOException {
|
||||||
|
boolean anyResult = false;
|
||||||
|
Map<T, Boolean> results =
|
||||||
|
invokeConcurrent(locations, method, false, false, Boolean.class);
|
||||||
|
for (Boolean value : results.values()) {
|
||||||
|
boolean result = value.booleanValue();
|
||||||
|
if (result) {
|
||||||
|
anyResult = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return anyResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke multiple concurrent proxy calls to different clients. Returns an
|
||||||
|
* array of results.
|
||||||
|
*
|
||||||
|
* Re-throws exceptions generated by the remote RPC call as either
|
||||||
|
* RemoteException or IOException.
|
||||||
|
*
|
||||||
|
* @param <T> The type of the remote location.
|
||||||
|
* @param locations List of remote locations to call concurrently.
|
||||||
|
* @param method The remote method and parameters to invoke.
|
||||||
|
* @throws IOException If all the calls throw an exception.
|
||||||
|
*/
|
||||||
|
public <T extends RemoteLocationContext, R> void invokeConcurrent(
|
||||||
|
final Collection<T> locations, final RemoteMethod method)
|
||||||
|
throws IOException {
|
||||||
|
invokeConcurrent(locations, method, void.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke multiple concurrent proxy calls to different clients. Returns an
|
||||||
|
* array of results.
|
||||||
|
*
|
||||||
|
* Re-throws exceptions generated by the remote RPC call as either
|
||||||
|
* RemoteException or IOException.
|
||||||
|
*
|
||||||
|
* @param <T> The type of the remote location.
|
||||||
|
* @param locations List of remote locations to call concurrently.
|
||||||
|
* @param method The remote method and parameters to invoke.
|
||||||
|
* @return Result of invoking the method per subcluster: nsId -> result.
|
||||||
|
* @throws IOException If all the calls throw an exception.
|
||||||
|
*/
|
||||||
|
public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
|
||||||
|
final Collection<T> locations, final RemoteMethod method, Class<R> clazz)
|
||||||
|
throws IOException {
|
||||||
|
return invokeConcurrent(locations, method, false, false, clazz);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoke multiple concurrent proxy calls to different clients. Returns an
|
* Invoke multiple concurrent proxy calls to different clients. Returns an
|
||||||
* array of results.
|
* array of results.
|
||||||
|
|
|
@ -510,6 +510,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
|
||||||
|
if (createParent && isPathAll(src)) {
|
||||||
|
int index = src.lastIndexOf(Path.SEPARATOR);
|
||||||
|
String parent = src.substring(0, index);
|
||||||
|
LOG.debug("Creating {} requires creating parent {}", src, parent);
|
||||||
|
FsPermission parentPermissions = getParentPermission(masked);
|
||||||
|
boolean success = mkdirs(parent, parentPermissions, createParent);
|
||||||
|
if (!success) {
|
||||||
|
// This shouldn't happen as mkdirs returns true or exception
|
||||||
|
LOG.error("Couldn't create parents for {}", src);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
RemoteLocation createLocation = getCreateLocation(src);
|
RemoteLocation createLocation = getCreateLocation(src);
|
||||||
RemoteMethod method = new RemoteMethod("create",
|
RemoteMethod method = new RemoteMethod("create",
|
||||||
new Class<?>[] {String.class, FsPermission.class, String.class,
|
new Class<?>[] {String.class, FsPermission.class, String.class,
|
||||||
|
@ -521,6 +533,32 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
|
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the permissions for the parent of a child with given permissions. If
|
||||||
|
* the child has r--, we will set it to r-x.
|
||||||
|
* @param mask The permission mask of the child.
|
||||||
|
* @return The permission mask of the parent.
|
||||||
|
*/
|
||||||
|
private static FsPermission getParentPermission(final FsPermission mask) {
|
||||||
|
FsPermission ret = new FsPermission(
|
||||||
|
applyExecute(mask.getUserAction()),
|
||||||
|
applyExecute(mask.getGroupAction()),
|
||||||
|
applyExecute(mask.getOtherAction()));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the execute permissions if it can be read.
|
||||||
|
* @param action Input permission.
|
||||||
|
* @return Output permission.
|
||||||
|
*/
|
||||||
|
private static FsAction applyExecute(final FsAction action) {
|
||||||
|
if (action.and(FsAction.READ) == FsAction.READ) {
|
||||||
|
return action.or(FsAction.EXECUTE);
|
||||||
|
}
|
||||||
|
return action;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the location to create a file. It checks if the file already existed
|
* Get the location to create a file. It checks if the file already existed
|
||||||
* in one of the locations.
|
* in one of the locations.
|
||||||
|
@ -580,7 +618,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("append",
|
RemoteMethod method = new RemoteMethod("append",
|
||||||
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
|
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
|
||||||
new RemoteParam(), clientName, flag);
|
new RemoteParam(), clientName, flag);
|
||||||
return (LastBlockWithStatus) rpcClient.invokeSequential(
|
return rpcClient.invokeSequential(
|
||||||
locations, method, LastBlockWithStatus.class, null);
|
locations, method, LastBlockWithStatus.class, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,7 +681,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setPermission",
|
RemoteMethod method = new RemoteMethod("setPermission",
|
||||||
new Class<?>[] {String.class, FsPermission.class},
|
new Class<?>[] {String.class, FsPermission.class},
|
||||||
new RemoteParam(), permissions);
|
new RemoteParam(), permissions);
|
||||||
rpcClient.invokeSequential(locations, method);
|
if (isPathAll(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
@ -655,7 +697,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setOwner",
|
RemoteMethod method = new RemoteMethod("setOwner",
|
||||||
new Class<?>[] {String.class, String.class, String.class},
|
new Class<?>[] {String.class, String.class, String.class},
|
||||||
new RemoteParam(), username, groupname);
|
new RemoteParam(), username, groupname);
|
||||||
rpcClient.invokeSequential(locations, method);
|
if (isPathAll(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -933,8 +979,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("delete",
|
RemoteMethod method = new RemoteMethod("delete",
|
||||||
new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
|
new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
|
||||||
recursive);
|
recursive);
|
||||||
return ((Boolean) rpcClient.invokeSequential(locations, method,
|
if (isPathAll(src)) {
|
||||||
Boolean.class, Boolean.TRUE)).booleanValue();
|
return rpcClient.invokeAll(locations, method);
|
||||||
|
} else {
|
||||||
|
return rpcClient.invokeSequential(locations, method,
|
||||||
|
Boolean.class, Boolean.TRUE).booleanValue();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
@ -943,6 +993,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
|
||||||
final List<RemoteLocation> locations = getLocationsForPath(src, true);
|
final List<RemoteLocation> locations = getLocationsForPath(src, true);
|
||||||
|
RemoteMethod method = new RemoteMethod("mkdirs",
|
||||||
|
new Class<?>[] {String.class, FsPermission.class, boolean.class},
|
||||||
|
new RemoteParam(), masked, createParent);
|
||||||
|
|
||||||
|
// Create in all locations
|
||||||
|
if (isPathAll(src)) {
|
||||||
|
return rpcClient.invokeAll(locations, method);
|
||||||
|
}
|
||||||
|
|
||||||
if (locations.size() > 1) {
|
if (locations.size() > 1) {
|
||||||
// Check if this directory already exists
|
// Check if this directory already exists
|
||||||
try {
|
try {
|
||||||
|
@ -959,9 +1018,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteLocation firstLocation = locations.get(0);
|
RemoteLocation firstLocation = locations.get(0);
|
||||||
RemoteMethod method = new RemoteMethod("mkdirs",
|
|
||||||
new Class<?>[] {String.class, FsPermission.class, boolean.class},
|
|
||||||
new RemoteParam(), masked, createParent);
|
|
||||||
return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
|
return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
|
||||||
.booleanValue();
|
.booleanValue();
|
||||||
}
|
}
|
||||||
|
@ -1077,8 +1133,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
final List<RemoteLocation> locations = getLocationsForPath(src, false);
|
final List<RemoteLocation> locations = getLocationsForPath(src, false);
|
||||||
RemoteMethod method = new RemoteMethod("getFileInfo",
|
RemoteMethod method = new RemoteMethod("getFileInfo",
|
||||||
new Class<?>[] {String.class}, new RemoteParam());
|
new Class<?>[] {String.class}, new RemoteParam());
|
||||||
HdfsFileStatus ret = (HdfsFileStatus) rpcClient.invokeSequential(
|
|
||||||
locations, method, HdfsFileStatus.class, null);
|
HdfsFileStatus ret = null;
|
||||||
|
// If it's a directory, we check in all locations
|
||||||
|
if (isPathAll(src)) {
|
||||||
|
ret = getFileInfoAll(locations, method);
|
||||||
|
} else {
|
||||||
|
// Check for file information sequentially
|
||||||
|
ret = (HdfsFileStatus) rpcClient.invokeSequential(
|
||||||
|
locations, method, HdfsFileStatus.class, null);
|
||||||
|
}
|
||||||
|
|
||||||
// If there is no real path, check mount points
|
// If there is no real path, check mount points
|
||||||
if (ret == null) {
|
if (ret == null) {
|
||||||
|
@ -1096,6 +1160,37 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the file info from all the locations.
|
||||||
|
*
|
||||||
|
* @param locations Locations to check.
|
||||||
|
* @param method The file information method to run.
|
||||||
|
* @return The first file info if it's a file, the directory if it's
|
||||||
|
* everywhere.
|
||||||
|
* @throws IOException If all the locations throw an exception.
|
||||||
|
*/
|
||||||
|
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
|
||||||
|
final RemoteMethod method) throws IOException {
|
||||||
|
|
||||||
|
// Get the file info from everybody
|
||||||
|
Map<RemoteLocation, HdfsFileStatus> results =
|
||||||
|
rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
|
||||||
|
|
||||||
|
// We return the first file
|
||||||
|
HdfsFileStatus dirStatus = null;
|
||||||
|
for (RemoteLocation loc : locations) {
|
||||||
|
HdfsFileStatus fileStatus = results.get(loc);
|
||||||
|
if (fileStatus != null) {
|
||||||
|
if (!fileStatus.isDirectory()) {
|
||||||
|
return fileStatus;
|
||||||
|
} else if (dirStatus == null) {
|
||||||
|
dirStatus = fileStatus;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dirStatus;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public boolean isFileClosed(String src) throws IOException {
|
public boolean isFileClosed(String src) throws IOException {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
|
@ -2071,6 +2166,27 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a path should be in all subclusters.
|
||||||
|
*
|
||||||
|
* @param path Path to check.
|
||||||
|
* @return If a path should be in all subclusters.
|
||||||
|
*/
|
||||||
|
private boolean isPathAll(final String path) {
|
||||||
|
if (subclusterResolver instanceof MountTableResolver) {
|
||||||
|
try {
|
||||||
|
MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
|
||||||
|
MountTable entry = mountTable.getMountPoint(path);
|
||||||
|
if (entry != null) {
|
||||||
|
return entry.isAll();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Cannot get mount point: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a path is in a read only mount point.
|
* Check if a path is in a read only mount point.
|
||||||
*
|
*
|
||||||
|
|
|
@ -37,8 +37,6 @@ import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Data schema for
|
* Data schema for
|
||||||
|
@ -50,7 +48,6 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
public abstract class MountTable extends BaseRecord {
|
public abstract class MountTable extends BaseRecord {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
|
|
||||||
public static final String ERROR_MSG_NO_SOURCE_PATH =
|
public static final String ERROR_MSG_NO_SOURCE_PATH =
|
||||||
"Invalid entry, no source path specified ";
|
"Invalid entry, no source path specified ";
|
||||||
public static final String ERROR_MSG_MUST_START_WITH_BACK_SLASH =
|
public static final String ERROR_MSG_MUST_START_WITH_BACK_SLASH =
|
||||||
|
@ -417,6 +414,16 @@ public abstract class MountTable extends BaseRecord {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a mount table spans all locations.
|
||||||
|
* @return If the mount table spreads across all locations.
|
||||||
|
*/
|
||||||
|
public boolean isAll() {
|
||||||
|
DestinationOrder order = getDestOrder();
|
||||||
|
return order == DestinationOrder.HASH_ALL ||
|
||||||
|
order == DestinationOrder.RANDOM;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Normalize a path for that filesystem.
|
* Normalize a path for that filesystem.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,402 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the use of the resolvers that write in all subclusters from the
|
||||||
|
* Router. It supports:
|
||||||
|
* <li>HashResolver
|
||||||
|
* <li>RandomResolver.
|
||||||
|
*/
|
||||||
|
public class TestRouterAllResolver {
|
||||||
|
|
||||||
|
/** Directory that will be in a HASH_ALL mount point. */
|
||||||
|
private static final String TEST_DIR_HASH_ALL = "/hashall";
|
||||||
|
/** Directory that will be in a HASH_ALL mount point. */
|
||||||
|
private static final String TEST_DIR_RANDOM = "/random";
|
||||||
|
|
||||||
|
/** Number of namespaces. */
|
||||||
|
private static final int NUM_NAMESPACES = 2;
|
||||||
|
|
||||||
|
|
||||||
|
/** Mini HDFS clusters with Routers and State Store. */
|
||||||
|
private static StateStoreDFSCluster cluster;
|
||||||
|
/** Router for testing. */
|
||||||
|
private static RouterContext routerContext;
|
||||||
|
/** Router/federated filesystem. */
|
||||||
|
private static FileSystem routerFs;
|
||||||
|
/** Filesystem for each namespace. */
|
||||||
|
private static List<FileSystem> nsFss = new LinkedList<>();
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
// 2 nameservices with 1 namenode each (no HA needed for this test)
|
||||||
|
cluster = new StateStoreDFSCluster(
|
||||||
|
false, NUM_NAMESPACES, MultipleDestinationMountTableResolver.class);
|
||||||
|
|
||||||
|
// Start NNs and DNs and wait until ready
|
||||||
|
cluster.startCluster();
|
||||||
|
|
||||||
|
// Build and start a Router with: State Store + Admin + RPC
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.stateStore()
|
||||||
|
.admin()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
cluster.addRouterOverrides(routerConf);
|
||||||
|
cluster.startRouters();
|
||||||
|
routerContext = cluster.getRandomRouter();
|
||||||
|
|
||||||
|
// Register and verify all NNs with all routers
|
||||||
|
cluster.registerNamenodes();
|
||||||
|
cluster.waitNamenodeRegistration();
|
||||||
|
|
||||||
|
// Setup the test mount point
|
||||||
|
createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
|
||||||
|
createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
|
||||||
|
|
||||||
|
// Get filesystems for federated and each namespace
|
||||||
|
routerFs = routerContext.getFileSystem();
|
||||||
|
for (String nsId : cluster.getNameservices()) {
|
||||||
|
List<NamenodeContext> nns = cluster.getNamenodes(nsId);
|
||||||
|
for (NamenodeContext nn : nns) {
|
||||||
|
FileSystem nnFs = nn.getFileSystem();
|
||||||
|
nsFss.add(nnFs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(NUM_NAMESPACES, nsFss.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
routerContext = null;
|
||||||
|
routerFs = null;
|
||||||
|
nsFss.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHashAll() throws Exception {
|
||||||
|
testAll(TEST_DIR_HASH_ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRandomAll() throws Exception {
|
||||||
|
testAll(TEST_DIR_RANDOM);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that the resolver spreads files across subclusters in the whole
|
||||||
|
* tree.
|
||||||
|
* @throws Exception If the resolver is not working.
|
||||||
|
*/
|
||||||
|
private void testAll(final String path) throws Exception {
|
||||||
|
|
||||||
|
// Create directories in different levels
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir0"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir1"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir20"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir21"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir22"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir220"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir221"));
|
||||||
|
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir222"));
|
||||||
|
assertDirsEverywhere(path, 9);
|
||||||
|
|
||||||
|
// Create 14 files at different levels of the tree
|
||||||
|
createTestFile(routerFs, path + "/dir0/file1.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir0/file2.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir1/file2.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir1/file3.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir20/file4.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir20/file5.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir21/file6.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir21/file7.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/file8.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/file9.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/dir220/file10.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/dir220/file11.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/dir220/file12.txt");
|
||||||
|
createTestFile(routerFs, path + "/dir2/dir22/dir220/file13.txt");
|
||||||
|
assertDirsEverywhere(path, 9);
|
||||||
|
assertFilesDistributed(path, 14);
|
||||||
|
|
||||||
|
// Test append
|
||||||
|
String testFile = path + "/dir2/dir22/dir220/file-append.txt";
|
||||||
|
createTestFile(routerFs, testFile);
|
||||||
|
Path testFilePath = new Path(testFile);
|
||||||
|
assertTrue("Created file is too small",
|
||||||
|
routerFs.getFileStatus(testFilePath).getLen() > 50);
|
||||||
|
appendTestFile(routerFs, testFile);
|
||||||
|
assertTrue("Append file is too small",
|
||||||
|
routerFs.getFileStatus(testFilePath).getLen() > 110);
|
||||||
|
assertDirsEverywhere(path, 9);
|
||||||
|
assertFilesDistributed(path, 15);
|
||||||
|
|
||||||
|
// Removing a directory should remove it from every subcluster
|
||||||
|
routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
|
||||||
|
assertDirsEverywhere(path, 8);
|
||||||
|
assertFilesDistributed(path, 10);
|
||||||
|
|
||||||
|
// Removing all sub directories
|
||||||
|
routerFs.delete(new Path(path + "/dir0"), true);
|
||||||
|
routerFs.delete(new Path(path + "/dir1"), true);
|
||||||
|
routerFs.delete(new Path(path + "/dir2"), true);
|
||||||
|
assertDirsEverywhere(path, 0);
|
||||||
|
assertFilesDistributed(path, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Directories in HASH_ALL mount points must be in every namespace.
|
||||||
|
* @param path Path to check under.
|
||||||
|
* @param expectedNumDirs Expected number of directories.
|
||||||
|
* @throws IOException If it cannot check the directories.
|
||||||
|
*/
|
||||||
|
private void assertDirsEverywhere(String path, int expectedNumDirs)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// Check for the directories in each filesystem
|
||||||
|
List<FileStatus> files = listRecursive(routerFs, path);
|
||||||
|
int numDirs = 0;
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
numDirs++;
|
||||||
|
|
||||||
|
Path dirPath = file.getPath();
|
||||||
|
Path checkPath = getRelativePath(dirPath);
|
||||||
|
for (FileSystem nsFs : nsFss) {
|
||||||
|
FileStatus fileStatus1 = nsFs.getFileStatus(checkPath);
|
||||||
|
assertTrue(file + " should be a directory",
|
||||||
|
fileStatus1.isDirectory());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(expectedNumDirs, numDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that the files are somewhat spread across namespaces.
|
||||||
|
* @param path Path to check under.
|
||||||
|
* @param expectedNumFiles Number of files expected.
|
||||||
|
* @throws IOException If the files cannot be checked.
|
||||||
|
*/
|
||||||
|
private void assertFilesDistributed(String path, int expectedNumFiles)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// Check where the files went
|
||||||
|
List<FileStatus> routerFiles = listRecursive(routerFs, path);
|
||||||
|
List<List<FileStatus>> nssFiles = new LinkedList<>();
|
||||||
|
for (FileSystem nsFs : nsFss) {
|
||||||
|
List<FileStatus> nsFiles = listRecursive(nsFs, path);
|
||||||
|
nssFiles.add(nsFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should see all the files in the federated view
|
||||||
|
int numRouterFiles = getNumTxtFiles(routerFiles);
|
||||||
|
assertEquals(numRouterFiles, expectedNumFiles);
|
||||||
|
|
||||||
|
// All the files should be spread somewhat evenly across subclusters
|
||||||
|
List<Integer> numNsFiles = new LinkedList<>();
|
||||||
|
int sumNsFiles = 0;
|
||||||
|
for (int i = 0; i < NUM_NAMESPACES; i++) {
|
||||||
|
List<FileStatus> nsFiles = nssFiles.get(i);
|
||||||
|
int numFiles = getNumTxtFiles(nsFiles);
|
||||||
|
numNsFiles.add(numFiles);
|
||||||
|
sumNsFiles += numFiles;
|
||||||
|
}
|
||||||
|
assertEquals(numRouterFiles, sumNsFiles);
|
||||||
|
if (expectedNumFiles > 0) {
|
||||||
|
for (int numFiles : numNsFiles) {
|
||||||
|
assertTrue("Files not distributed: " + numNsFiles, numFiles > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a test file in the filesystem and check if it was written.
|
||||||
|
* @param fs Filesystem.
|
||||||
|
* @param filename Name of the file to create.
|
||||||
|
* @throws IOException If it cannot create the file.
|
||||||
|
*/
|
||||||
|
private static void createTestFile(
|
||||||
|
final FileSystem fs, final String filename)throws IOException {
|
||||||
|
|
||||||
|
final Path path = new Path(filename);
|
||||||
|
|
||||||
|
// Write the data
|
||||||
|
FSDataOutputStream os = fs.create(path);
|
||||||
|
os.writeUTF("Test data " + filename);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
// Read the data and check
|
||||||
|
FSDataInputStream is = fs.open(path);
|
||||||
|
String read = is.readUTF();
|
||||||
|
assertEquals("Test data " + filename, read);
|
||||||
|
is.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append to a test file in the filesystem and check if we appended.
|
||||||
|
* @param fs Filesystem.
|
||||||
|
* @param filename Name of the file to append to.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static void appendTestFile(
|
||||||
|
final FileSystem fs, final String filename) throws IOException {
|
||||||
|
final Path path = new Path(filename);
|
||||||
|
|
||||||
|
// Write the data
|
||||||
|
FSDataOutputStream os = fs.append(path);
|
||||||
|
os.writeUTF("Test append data " + filename);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
// Read the data previous data
|
||||||
|
FSDataInputStream is = fs.open(path);
|
||||||
|
String read = is.readUTF();
|
||||||
|
assertEquals(read, "Test data " + filename);
|
||||||
|
// Read the new data and check
|
||||||
|
read = is.readUTF();
|
||||||
|
assertEquals(read, "Test append data " + filename);
|
||||||
|
is.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count the number of text files in a list.
|
||||||
|
* @param files File list.
|
||||||
|
* @return Number of .txt files.
|
||||||
|
*/
|
||||||
|
private static int getNumTxtFiles(final List<FileStatus> files) {
|
||||||
|
int numFiles = 0;
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
if (file.getPath().getName().endsWith(".txt")) {
|
||||||
|
numFiles++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return numFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the relative path within a filesystem (removes the filesystem prefix).
|
||||||
|
* @param path Path to check.
|
||||||
|
* @return File within the filesystem.
|
||||||
|
*/
|
||||||
|
private static Path getRelativePath(final Path path) {
|
||||||
|
URI uri = path.toUri();
|
||||||
|
String uriPath = uri.getPath();
|
||||||
|
return new Path(uriPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list the files/dirs under a path.
|
||||||
|
* @param fs Filesystem to check in.
|
||||||
|
* @param path Path to check for.
|
||||||
|
* @return List of files.
|
||||||
|
* @throws IOException If it cannot list the files.
|
||||||
|
*/
|
||||||
|
private List<FileStatus> listRecursive(
|
||||||
|
final FileSystem fs, final String path) throws IOException {
|
||||||
|
List<FileStatus> ret = new LinkedList<>();
|
||||||
|
List<Path> temp = new LinkedList<>();
|
||||||
|
temp.add(new Path(path));
|
||||||
|
while (!temp.isEmpty()) {
|
||||||
|
Path p = temp.remove(0);
|
||||||
|
for (FileStatus fileStatus : fs.listStatus(p)) {
|
||||||
|
ret.add(fileStatus);
|
||||||
|
if (fileStatus.isDirectory()) {
|
||||||
|
temp.add(fileStatus.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a mount table entry in all nameservices and wait until it is
|
||||||
|
* available in all routers.
|
||||||
|
* @param mountPoint Name of the mount point.
|
||||||
|
* @param order Order of the mount table entry.
|
||||||
|
* @throws Exception If the entry could not be created.
|
||||||
|
*/
|
||||||
|
private void createMountTableEntry(
|
||||||
|
final String mountPoint, final DestinationOrder order) throws Exception {
|
||||||
|
|
||||||
|
RouterClient admin = routerContext.getAdminClient();
|
||||||
|
MountTableManager mountTable = admin.getMountTableManager();
|
||||||
|
Map<String, String> destMap = new HashMap<>();
|
||||||
|
for (String nsId : cluster.getNameservices()) {
|
||||||
|
destMap.put(nsId, mountPoint);
|
||||||
|
}
|
||||||
|
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
|
||||||
|
newEntry.setDestOrder(order);
|
||||||
|
AddMountTableEntryRequest addRequest =
|
||||||
|
AddMountTableEntryRequest.newInstance(newEntry);
|
||||||
|
AddMountTableEntryResponse addResponse =
|
||||||
|
mountTable.addMountTableEntry(addRequest);
|
||||||
|
boolean created = addResponse.getStatus();
|
||||||
|
assertTrue(created);
|
||||||
|
|
||||||
|
// Refresh the caches to get the mount table
|
||||||
|
Router router = routerContext.getRouter();
|
||||||
|
StateStoreService stateStore = router.getStateStore();
|
||||||
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
// Check for the path
|
||||||
|
GetMountTableEntriesRequest getRequest =
|
||||||
|
GetMountTableEntriesRequest.newInstance(mountPoint);
|
||||||
|
GetMountTableEntriesResponse getResponse =
|
||||||
|
mountTable.getMountTableEntries(getRequest);
|
||||||
|
List<MountTable> entries = getResponse.getEntries();
|
||||||
|
assertEquals(1, entries.size());
|
||||||
|
assertEquals(mountPoint, entries.get(0).getSourcePath());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue