HDFS-14226. RBF: Setting attributes should set on all subclusters' directories. Contributed by Ayush Saxena.
This commit is contained in:
parent
75f8b6ccfa
commit
e2a3c4494b
|
@ -157,7 +157,11 @@ public class ErasureCoding {
|
||||||
RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy",
|
RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy",
|
||||||
new Class<?>[] {String.class, String.class},
|
new Class<?>[] {String.class, String.class},
|
||||||
new RemoteParam(), ecPolicyName);
|
new RemoteParam(), ecPolicyName);
|
||||||
rpcClient.invokeSequential(locations, remoteMethod, null, null);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, remoteMethod);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, remoteMethod);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsetErasureCodingPolicy(String src) throws IOException {
|
public void unsetErasureCodingPolicy(String src) throws IOException {
|
||||||
|
@ -167,7 +171,11 @@ public class ErasureCoding {
|
||||||
rpcServer.getLocationsForPath(src, true);
|
rpcServer.getLocationsForPath(src, true);
|
||||||
RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy",
|
RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy",
|
||||||
new Class<?>[] {String.class}, new RemoteParam());
|
new Class<?>[] {String.class}, new RemoteParam());
|
||||||
rpcClient.invokeSequential(locations, remoteMethod, null, null);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, remoteMethod);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, remoteMethod);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
|
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
|
||||||
|
|
|
@ -219,7 +219,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
|
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
|
||||||
|
|
||||||
if (createParent && isPathAll(src)) {
|
if (createParent && rpcServer.isPathAll(src)) {
|
||||||
int index = src.lastIndexOf(Path.SEPARATOR);
|
int index = src.lastIndexOf(Path.SEPARATOR);
|
||||||
String parent = src.substring(0, index);
|
String parent = src.substring(0, index);
|
||||||
LOG.debug("Creating {} requires creating parent {}", src, parent);
|
LOG.debug("Creating {} requires creating parent {}", src, parent);
|
||||||
|
@ -279,9 +279,13 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setReplication",
|
RemoteMethod method = new RemoteMethod("setReplication",
|
||||||
new Class<?>[] {String.class, short.class}, new RemoteParam(),
|
new Class<?>[] {String.class, short.class}, new RemoteParam(),
|
||||||
replication);
|
replication);
|
||||||
Object result = rpcClient.invokeSequential(
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
locations, method, Boolean.class, Boolean.TRUE);
|
return !rpcClient.invokeConcurrent(locations, method, Boolean.class)
|
||||||
return (boolean) result;
|
.containsValue(false);
|
||||||
|
} else {
|
||||||
|
return rpcClient.invokeSequential(locations, method, Boolean.class,
|
||||||
|
Boolean.TRUE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -305,7 +309,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setPermission",
|
RemoteMethod method = new RemoteMethod("setPermission",
|
||||||
new Class<?>[] {String.class, FsPermission.class},
|
new Class<?>[] {String.class, FsPermission.class},
|
||||||
new RemoteParam(), permissions);
|
new RemoteParam(), permissions);
|
||||||
if (isPathAll(src)) {
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
rpcClient.invokeConcurrent(locations, method);
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
} else {
|
} else {
|
||||||
rpcClient.invokeSequential(locations, method);
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
@ -322,7 +326,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setOwner",
|
RemoteMethod method = new RemoteMethod("setOwner",
|
||||||
new Class<?>[] {String.class, String.class, String.class},
|
new Class<?>[] {String.class, String.class, String.class},
|
||||||
new RemoteParam(), username, groupname);
|
new RemoteParam(), username, groupname);
|
||||||
if (isPathAll(src)) {
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
rpcClient.invokeConcurrent(locations, method);
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
} else {
|
} else {
|
||||||
rpcClient.invokeSequential(locations, method);
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
@ -555,7 +559,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("delete",
|
RemoteMethod method = new RemoteMethod("delete",
|
||||||
new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
|
new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
|
||||||
recursive);
|
recursive);
|
||||||
if (isPathAll(src)) {
|
if (rpcServer.isPathAll(src)) {
|
||||||
return rpcClient.invokeAll(locations, method);
|
return rpcClient.invokeAll(locations, method);
|
||||||
} else {
|
} else {
|
||||||
return rpcClient.invokeSequential(locations, method,
|
return rpcClient.invokeSequential(locations, method,
|
||||||
|
@ -575,7 +579,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
new RemoteParam(), masked, createParent);
|
new RemoteParam(), masked, createParent);
|
||||||
|
|
||||||
// Create in all locations
|
// Create in all locations
|
||||||
if (isPathAll(src)) {
|
if (rpcServer.isPathAll(src)) {
|
||||||
return rpcClient.invokeAll(locations, method);
|
return rpcClient.invokeAll(locations, method);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -713,7 +717,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
|
|
||||||
HdfsFileStatus ret = null;
|
HdfsFileStatus ret = null;
|
||||||
// If it's a directory, we check in all locations
|
// If it's a directory, we check in all locations
|
||||||
if (isPathAll(src)) {
|
if (rpcServer.isPathAll(src)) {
|
||||||
ret = getFileInfoAll(locations, method);
|
ret = getFileInfoAll(locations, method);
|
||||||
} else {
|
} else {
|
||||||
// Check for file information sequentially
|
// Check for file information sequentially
|
||||||
|
@ -1315,7 +1319,11 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("setXAttr",
|
RemoteMethod method = new RemoteMethod("setXAttr",
|
||||||
new Class<?>[] {String.class, XAttr.class, EnumSet.class},
|
new Class<?>[] {String.class, XAttr.class, EnumSet.class},
|
||||||
new RemoteParam(), xAttr, flag);
|
new RemoteParam(), xAttr, flag);
|
||||||
rpcClient.invokeSequential(locations, method);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -1356,7 +1364,11 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
rpcServer.getLocationsForPath(src, true);
|
rpcServer.getLocationsForPath(src, true);
|
||||||
RemoteMethod method = new RemoteMethod("removeXAttr",
|
RemoteMethod method = new RemoteMethod("removeXAttr",
|
||||||
new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
|
new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
|
||||||
rpcClient.invokeSequential(locations, method);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1718,27 +1730,6 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a path should be in all subclusters.
|
|
||||||
*
|
|
||||||
* @param path Path to check.
|
|
||||||
* @return If a path should be in all subclusters.
|
|
||||||
*/
|
|
||||||
private boolean isPathAll(final String path) {
|
|
||||||
if (subclusterResolver instanceof MountTableResolver) {
|
|
||||||
try {
|
|
||||||
MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
|
|
||||||
MountTable entry = mountTable.getMountPoint(path);
|
|
||||||
if (entry != null) {
|
|
||||||
return entry.isAll();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Cannot get mount point", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new file status for a mount point.
|
* Create a new file status for a mount point.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1541,4 +1541,48 @@ public class RouterRpcServer extends AbstractService
|
||||||
public FederationRPCMetrics getRPCMetrics() {
|
public FederationRPCMetrics getRPCMetrics() {
|
||||||
return this.rpcMonitor.getRPCMetrics();
|
return this.rpcMonitor.getRPCMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a path should be in all subclusters.
|
||||||
|
*
|
||||||
|
* @param path Path to check.
|
||||||
|
* @return If a path should be in all subclusters.
|
||||||
|
*/
|
||||||
|
boolean isPathAll(final String path) {
|
||||||
|
if (subclusterResolver instanceof MountTableResolver) {
|
||||||
|
try {
|
||||||
|
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
|
||||||
|
MountTable entry = mountTable.getMountPoint(path);
|
||||||
|
if (entry != null) {
|
||||||
|
return entry.isAll();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Cannot get mount point", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if call needs to be invoked to all the locations. The call is
|
||||||
|
* supposed to be invoked in all the locations in case the order of the mount
|
||||||
|
* entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a
|
||||||
|
* mount entry.
|
||||||
|
* @param path The path on which the operation need to be invoked.
|
||||||
|
* @return true if the call is supposed to invoked on all locations.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
boolean isInvokeConcurrent(final String path) throws IOException {
|
||||||
|
if (subclusterResolver instanceof MountTableResolver) {
|
||||||
|
MountTableResolver mountTableResolver =
|
||||||
|
(MountTableResolver) subclusterResolver;
|
||||||
|
List<String> mountPoints = mountTableResolver.getMountPoints(path);
|
||||||
|
// If this is a mount point, we need to invoke everywhere.
|
||||||
|
if (mountPoints != null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return isPathAll(path);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -50,7 +50,11 @@ public class RouterStoragePolicy {
|
||||||
new Class<?>[] {String.class, String.class},
|
new Class<?>[] {String.class, String.class},
|
||||||
new RemoteParam(),
|
new RemoteParam(),
|
||||||
policyName);
|
policyName);
|
||||||
rpcClient.invokeSequential(locations, method, null, null);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||||
|
@ -67,7 +71,11 @@ public class RouterStoragePolicy {
|
||||||
RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
|
RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
|
||||||
new Class<?>[] {String.class},
|
new Class<?>[] {String.class},
|
||||||
new RemoteParam());
|
new RemoteParam());
|
||||||
rpcClient.invokeSequential(locations, method);
|
if (rpcServer.isInvokeConcurrent(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locations, method);
|
||||||
|
} else {
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockStoragePolicy getStoragePolicy(String path)
|
public BlockStoragePolicy getStoragePolicy(String path)
|
||||||
|
|
|
@ -0,0 +1,394 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests router rpc with multiple destination mount table resolver.
|
||||||
|
*/
|
||||||
|
public class TestRouterRPCMultipleDestinationMountTableResolver {
|
||||||
|
private static StateStoreDFSCluster cluster;
|
||||||
|
private static RouterContext routerContext;
|
||||||
|
private static MountTableResolver resolver;
|
||||||
|
private static DistributedFileSystem nnFs0;
|
||||||
|
private static DistributedFileSystem nnFs1;
|
||||||
|
private static DistributedFileSystem routerFs;
|
||||||
|
private static RouterRpcServer rpcServer;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
|
||||||
|
// Build and start a federated cluster
|
||||||
|
cluster = new StateStoreDFSCluster(false, 2,
|
||||||
|
MultipleDestinationMountTableResolver.class);
|
||||||
|
Configuration routerConf =
|
||||||
|
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
|
||||||
|
|
||||||
|
Configuration hdfsConf = new Configuration(false);
|
||||||
|
|
||||||
|
cluster.addRouterOverrides(routerConf);
|
||||||
|
cluster.addNamenodeOverrides(hdfsConf);
|
||||||
|
cluster.startCluster();
|
||||||
|
cluster.startRouters();
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
|
||||||
|
routerContext = cluster.getRandomRouter();
|
||||||
|
resolver =
|
||||||
|
(MountTableResolver) routerContext.getRouter().getSubclusterResolver();
|
||||||
|
nnFs0 = (DistributedFileSystem) cluster
|
||||||
|
.getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
|
||||||
|
nnFs1 = (DistributedFileSystem) cluster
|
||||||
|
.getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
|
||||||
|
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
|
||||||
|
rpcServer =routerContext.getRouter().getRpcServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.stopRouter(routerContext);
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SetUp the mount entry , directories and file to verify invocation.
|
||||||
|
* @param order The order that the mount entry needs to follow.
|
||||||
|
* @throws Exception On account of any exception encountered during setting up
|
||||||
|
* the environment.
|
||||||
|
*/
|
||||||
|
public void setupOrderMountPath(DestinationOrder order) throws Exception {
|
||||||
|
Map<String, String> destMap = new HashMap<>();
|
||||||
|
destMap.put("ns0", "/tmp");
|
||||||
|
destMap.put("ns1", "/tmp");
|
||||||
|
nnFs0.mkdirs(new Path("/tmp"));
|
||||||
|
nnFs1.mkdirs(new Path("/tmp"));
|
||||||
|
MountTable addEntry = MountTable.newInstance("/mount", destMap);
|
||||||
|
addEntry.setDestOrder(order);
|
||||||
|
assertTrue(addMountTable(addEntry));
|
||||||
|
routerFs.mkdirs(new Path("/mount/dir/dir"));
|
||||||
|
DFSTestUtil.createFile(routerFs, new Path("/mount/dir/file"), 100L, (short) 1,
|
||||||
|
1024L);
|
||||||
|
DFSTestUtil.createFile(routerFs, new Path("/mount/file"), 100L, (short) 1,
|
||||||
|
1024L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void resetTestEnvironment() throws IOException {
|
||||||
|
RouterClient client = routerContext.getAdminClient();
|
||||||
|
MountTableManager mountTableManager = client.getMountTableManager();
|
||||||
|
RemoveMountTableEntryRequest req2 =
|
||||||
|
RemoveMountTableEntryRequest.newInstance("/mount");
|
||||||
|
mountTableManager.removeMountTableEntry(req2);
|
||||||
|
nnFs0.delete(new Path("/tmp"), true);
|
||||||
|
nnFs1.delete(new Path("/tmp"), true);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvocationSpaceOrder() throws Exception {
|
||||||
|
setupOrderMountPath(DestinationOrder.SPACE);
|
||||||
|
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
||||||
|
assertTrue(isDirAll);
|
||||||
|
testInvocation(isDirAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvocationHashAllOrder() throws Exception {
|
||||||
|
setupOrderMountPath(DestinationOrder.HASH_ALL);
|
||||||
|
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
||||||
|
assertTrue(isDirAll);
|
||||||
|
testInvocation(isDirAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvocationRandomOrder() throws Exception {
|
||||||
|
setupOrderMountPath(DestinationOrder.RANDOM);
|
||||||
|
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
||||||
|
assertTrue(isDirAll);
|
||||||
|
testInvocation(isDirAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvocationHashOrder() throws Exception {
|
||||||
|
setupOrderMountPath(DestinationOrder.HASH);
|
||||||
|
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
||||||
|
assertFalse(isDirAll);
|
||||||
|
testInvocation(isDirAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvocationLocalOrder() throws Exception {
|
||||||
|
setupOrderMountPath(DestinationOrder.LOCAL);
|
||||||
|
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
||||||
|
assertFalse(isDirAll);
|
||||||
|
testInvocation(isDirAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the invocation of API's at directory level , file level and at
|
||||||
|
* mount level.
|
||||||
|
* @param dirAll if true assumes that the mount entry creates directory on all
|
||||||
|
* locations.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void testInvocation(boolean dirAll) throws IOException {
|
||||||
|
// Verify invocation on nested directory and file.
|
||||||
|
Path mountDir = new Path("/mount/dir/dir");
|
||||||
|
Path nameSpaceFile = new Path("/tmp/dir/file");
|
||||||
|
Path mountFile = new Path("/mount/dir/file");
|
||||||
|
Path mountEntry = new Path("/mount");
|
||||||
|
Path mountDest = new Path("/tmp");
|
||||||
|
Path nameSpaceDir = new Path("/tmp/dir/dir");
|
||||||
|
final String name = "user.a1";
|
||||||
|
final byte[] value = {0x31, 0x32, 0x33};
|
||||||
|
testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
|
||||||
|
mountFile, nameSpaceDir, name, value);
|
||||||
|
|
||||||
|
// Verify invocation on non nested directory and file.
|
||||||
|
mountDir = new Path("/mount/dir");
|
||||||
|
nameSpaceFile = new Path("/tmp/file");
|
||||||
|
mountFile = new Path("/mount/file");
|
||||||
|
nameSpaceDir = new Path("/tmp/dir");
|
||||||
|
testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
|
||||||
|
mountFile, nameSpaceDir, name, value);
|
||||||
|
|
||||||
|
// Check invocation directly for a mount point.
|
||||||
|
// Verify owner and permissions.
|
||||||
|
routerFs.setOwner(mountEntry, "testuser", "testgroup");
|
||||||
|
routerFs.setPermission(mountEntry,
|
||||||
|
FsPermission.createImmutable((short) 777));
|
||||||
|
assertEquals("testuser", routerFs.getFileStatus(mountEntry).getOwner());
|
||||||
|
assertEquals("testuser", nnFs0.getFileStatus(mountDest).getOwner());
|
||||||
|
assertEquals("testuser", nnFs1.getFileStatus(mountDest).getOwner());
|
||||||
|
assertEquals((short) 777,
|
||||||
|
routerFs.getFileStatus(mountEntry).getPermission().toShort());
|
||||||
|
assertEquals((short) 777,
|
||||||
|
nnFs0.getFileStatus(mountDest).getPermission().toShort());
|
||||||
|
assertEquals((short) 777,
|
||||||
|
nnFs1.getFileStatus(mountDest).getPermission().toShort());
|
||||||
|
|
||||||
|
//Verify storage policy.
|
||||||
|
routerFs.setStoragePolicy(mountEntry, "COLD");
|
||||||
|
assertEquals("COLD", routerFs.getStoragePolicy(mountEntry).getName());
|
||||||
|
assertEquals("COLD", nnFs0.getStoragePolicy(mountDest).getName());
|
||||||
|
assertEquals("COLD", nnFs1.getStoragePolicy(mountDest).getName());
|
||||||
|
routerFs.unsetStoragePolicy(mountEntry);
|
||||||
|
assertEquals("HOT", routerFs.getStoragePolicy(mountDest).getName());
|
||||||
|
assertEquals("HOT", nnFs0.getStoragePolicy(mountDest).getName());
|
||||||
|
assertEquals("HOT", nnFs1.getStoragePolicy(mountDest).getName());
|
||||||
|
|
||||||
|
//Verify erasure coding policy.
|
||||||
|
routerFs.setErasureCodingPolicy(mountEntry, "RS-6-3-1024k");
|
||||||
|
assertEquals("RS-6-3-1024k",
|
||||||
|
routerFs.getErasureCodingPolicy(mountEntry).getName());
|
||||||
|
assertEquals("RS-6-3-1024k",
|
||||||
|
nnFs0.getErasureCodingPolicy(mountDest).getName());
|
||||||
|
assertEquals("RS-6-3-1024k",
|
||||||
|
nnFs1.getErasureCodingPolicy(mountDest).getName());
|
||||||
|
routerFs.unsetErasureCodingPolicy(mountEntry);
|
||||||
|
assertNull(routerFs.getErasureCodingPolicy(mountDest));
|
||||||
|
assertNull(nnFs0.getErasureCodingPolicy(mountDest));
|
||||||
|
assertNull(nnFs1.getErasureCodingPolicy(mountDest));
|
||||||
|
|
||||||
|
//Verify xAttr.
|
||||||
|
routerFs.setXAttr(mountEntry, name, value);
|
||||||
|
assertArrayEquals(value, routerFs.getXAttr(mountEntry, name));
|
||||||
|
assertArrayEquals(value, nnFs0.getXAttr(mountDest, name));
|
||||||
|
assertArrayEquals(value, nnFs1.getXAttr(mountDest, name));
|
||||||
|
routerFs.removeXAttr(mountEntry, name);
|
||||||
|
assertEquals(0, routerFs.getXAttrs(mountEntry).size());
|
||||||
|
assertEquals(0, nnFs0.getXAttrs(mountDest).size());
|
||||||
|
assertEquals(0, nnFs1.getXAttrs(mountDest).size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SetUp to verify invocations on directories and file.
|
||||||
|
*/
|
||||||
|
private void testDirectoryAndFileLevelInvocation(boolean dirAll,
|
||||||
|
Path mountDir, Path nameSpaceFile, Path mountFile, Path nameSpaceDir,
|
||||||
|
final String name, final byte[] value) throws IOException {
|
||||||
|
// Check invocation for a directory.
|
||||||
|
routerFs.setOwner(mountDir, "testuser", "testgroup");
|
||||||
|
routerFs.setPermission(mountDir, FsPermission.createImmutable((short) 777));
|
||||||
|
routerFs.setStoragePolicy(mountDir, "COLD");
|
||||||
|
routerFs.setErasureCodingPolicy(mountDir, "RS-6-3-1024k");
|
||||||
|
routerFs.setXAttr(mountDir, name, value);
|
||||||
|
|
||||||
|
// Verify the directory level invocations were checked in case of mounts not
|
||||||
|
// creating directories in all subclusters.
|
||||||
|
boolean checkedDir1 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
|
||||||
|
nnFs0, name, value);
|
||||||
|
boolean checkedDir2 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
|
||||||
|
nnFs1, name, value);
|
||||||
|
assertTrue("The file didn't existed in either of the subclusters.",
|
||||||
|
checkedDir1 || checkedDir2);
|
||||||
|
routerFs.unsetStoragePolicy(mountDir);
|
||||||
|
routerFs.removeXAttr(mountDir, name);
|
||||||
|
routerFs.unsetErasureCodingPolicy(mountDir);
|
||||||
|
|
||||||
|
checkedDir1 =
|
||||||
|
verifyDirectoryLevelUnsetInvocations(dirAll, nnFs0, nameSpaceDir);
|
||||||
|
checkedDir2 =
|
||||||
|
verifyDirectoryLevelUnsetInvocations(dirAll, nnFs1, nameSpaceDir);
|
||||||
|
assertTrue("The file didn't existed in either of the subclusters.",
|
||||||
|
checkedDir1 || checkedDir2);
|
||||||
|
|
||||||
|
// Check invocation for a file.
|
||||||
|
routerFs.setOwner(mountFile, "testuser", "testgroup");
|
||||||
|
routerFs.setPermission(mountFile,
|
||||||
|
FsPermission.createImmutable((short) 777));
|
||||||
|
routerFs.setStoragePolicy(mountFile, "COLD");
|
||||||
|
routerFs.setReplication(mountFile, (short) 2);
|
||||||
|
routerFs.setXAttr(mountFile, name, value);
|
||||||
|
verifyFileLevelInvocations(nameSpaceFile, nnFs0, mountFile, name, value);
|
||||||
|
verifyFileLevelInvocations(nameSpaceFile, nnFs1, mountFile, name, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify invocations of API's unseting values at the directory level.
|
||||||
|
* @param dirAll true if the mount entry order creates directory in all
|
||||||
|
* locations.
|
||||||
|
* @param nameSpaceDir path of the directory in the namespace.
|
||||||
|
* @param nnFs file system where the directory level invocation needs to be
|
||||||
|
* tested.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private boolean verifyDirectoryLevelUnsetInvocations(boolean dirAll,
|
||||||
|
DistributedFileSystem nnFs, Path nameSpaceDir) throws IOException {
|
||||||
|
boolean checked = false;
|
||||||
|
if (dirAll || nnFs.exists(nameSpaceDir)) {
|
||||||
|
checked = true;
|
||||||
|
assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceDir).getName());
|
||||||
|
assertNull(nnFs.getErasureCodingPolicy(nameSpaceDir));
|
||||||
|
assertEquals(0, nnFs.getXAttrs(nameSpaceDir).size());
|
||||||
|
}
|
||||||
|
return checked;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify file level invocations.
|
||||||
|
* @param nameSpaceFile path of the file in the namespace.
|
||||||
|
* @param nnFs the file system where the file invocation needs to checked.
|
||||||
|
* @param mountFile path of the file w.r.t. mount table.
|
||||||
|
* @param name name of Xattr.
|
||||||
|
* @param value value of Xattr.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void verifyFileLevelInvocations(Path nameSpaceFile,
|
||||||
|
DistributedFileSystem nnFs, Path mountFile, final String name,
|
||||||
|
final byte[] value) throws IOException {
|
||||||
|
if (nnFs.exists(nameSpaceFile)) {
|
||||||
|
assertEquals("testuser", nnFs.getFileStatus(nameSpaceFile).getOwner());
|
||||||
|
assertEquals((short) 777,
|
||||||
|
nnFs.getFileStatus(nameSpaceFile).getPermission().toShort());
|
||||||
|
assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceFile).getName());
|
||||||
|
assertEquals((short) 2,
|
||||||
|
nnFs.getFileStatus(nameSpaceFile).getReplication());
|
||||||
|
assertArrayEquals(value, nnFs.getXAttr(nameSpaceFile, name));
|
||||||
|
|
||||||
|
routerFs.unsetStoragePolicy(mountFile);
|
||||||
|
routerFs.removeXAttr(mountFile, name);
|
||||||
|
assertEquals(0, nnFs.getXAttrs(nameSpaceFile).size());
|
||||||
|
|
||||||
|
assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceFile).getName());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify invocations at the directory level.
|
||||||
|
* @param dirAll true if the mount entry order creates directory in all
|
||||||
|
* locations.
|
||||||
|
* @param nameSpaceDir path of the directory in the namespace.
|
||||||
|
* @param nnFs file system where the directory level invocation needs to be
|
||||||
|
* tested.
|
||||||
|
* @param name name for the Xattr.
|
||||||
|
* @param value value for the Xattr.
|
||||||
|
* @return true, if directory existed and successful verification of
|
||||||
|
* invocations.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private boolean verifyDirectoryLevelInvocations(boolean dirAll,
|
||||||
|
Path nameSpaceDir, DistributedFileSystem nnFs, final String name,
|
||||||
|
final byte[] value) throws IOException {
|
||||||
|
boolean checked = false;
|
||||||
|
if (dirAll || nnFs.exists(nameSpaceDir)) {
|
||||||
|
checked = true;
|
||||||
|
assertEquals("testuser", nnFs.getFileStatus(nameSpaceDir).getOwner());
|
||||||
|
assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceDir).getName());
|
||||||
|
assertEquals("RS-6-3-1024k",
|
||||||
|
nnFs.getErasureCodingPolicy(nameSpaceDir).getName());
|
||||||
|
assertArrayEquals(value, nnFs.getXAttr(nameSpaceDir, name));
|
||||||
|
assertEquals((short) 777,
|
||||||
|
nnFs.getFileStatus(nameSpaceDir).getPermission().toShort());
|
||||||
|
}
|
||||||
|
return checked;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a mount table entry to the mount table through the admin API.
|
||||||
|
* @param entry Mount table entry to add.
|
||||||
|
* @return If it was successfully added.
|
||||||
|
* @throws IOException + * Problems adding entries.
|
||||||
|
*/
|
||||||
|
private boolean addMountTable(final MountTable entry) throws IOException {
|
||||||
|
RouterClient client = routerContext.getAdminClient();
|
||||||
|
MountTableManager mountTableManager = client.getMountTableManager();
|
||||||
|
AddMountTableEntryRequest addRequest =
|
||||||
|
AddMountTableEntryRequest.newInstance(entry);
|
||||||
|
AddMountTableEntryResponse addResponse =
|
||||||
|
mountTableManager.addMountTableEntry(addRequest);
|
||||||
|
|
||||||
|
// Reload the Router cache
|
||||||
|
resolver.loadCache(true);
|
||||||
|
|
||||||
|
return addResponse.getStatus();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue