HDFS-12875. RBF: Complete logic for -readonly option of dfsrouteradmin add command. Contributed by Inigo Goiri.
(cherry picked from commit 5cd1056ad7
)
This commit is contained in:
parent
3dc1ea6050
commit
6a43fbe724
|
@ -102,8 +102,10 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
|
@ -1973,6 +1975,17 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
this.subclusterResolver);
|
||||
}
|
||||
|
||||
// We may block some write operations
|
||||
if (opCategory.get() == OperationCategory.WRITE) {
|
||||
// Check if the path is in a read only mount point
|
||||
if (isPathReadOnly(path)) {
|
||||
if (this.rpcMonitor != null) {
|
||||
this.rpcMonitor.routerFailureReadOnly();
|
||||
}
|
||||
throw new IOException(path + " is in a read only mount point");
|
||||
}
|
||||
}
|
||||
|
||||
return location.getDestinations();
|
||||
} catch (IOException ioe) {
|
||||
if (this.rpcMonitor != null) {
|
||||
|
@ -1982,6 +1995,27 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a path is in a read only mount point.
|
||||
*
|
||||
* @param path Path to check.
|
||||
* @return If the path is in a read only mount point.
|
||||
*/
|
||||
private boolean isPathReadOnly(final String path) {
|
||||
if (subclusterResolver instanceof MountTableResolver) {
|
||||
try {
|
||||
MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
|
||||
MountTable entry = mountTable.getMountPoint(path);
|
||||
if (entry != null && entry.isReadOnly()) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot get mount point: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the modification dates for mount points.
|
||||
*
|
||||
|
|
|
@ -77,7 +77,7 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
public void printUsage() {
|
||||
String usage = "Federation Admin Tools:\n"
|
||||
+ "\t[-add <source> <nameservice> <destination> "
|
||||
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n"
|
||||
+ "[-readonly]\n"
|
||||
+ "\t[-rm <source>]\n"
|
||||
+ "\t[-ls <path>]\n";
|
||||
System.out.println(usage);
|
||||
|
|
|
@ -423,7 +423,7 @@ Runs the DFS router. See [Router](./HDFSRouterFederation.html#Router) for more i
|
|||
Usage:
|
||||
|
||||
hdfs dfsrouteradmin
|
||||
[-add <source> <nameservice> <destination>]
|
||||
[-add <source> <nameservice> <destination> [-readonly]]
|
||||
[-rm <source>]
|
||||
[-ls <path>]
|
||||
|
||||
|
|
|
@ -184,6 +184,10 @@ For example, to create three mount points and list them:
|
|||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data/app2 ns3 /data/app2
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -ls
|
||||
|
||||
It also supports mount points that disallow writes:
|
||||
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /readonly ns1 / -readonly
|
||||
|
||||
If a mount point is not set, the Router will map it to the default namespace `dfs.federation.router.default.nameserviceId`.
|
||||
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.federation;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -52,6 +51,9 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
/**
|
||||
* Helper utilities for testing HDFS Federation.
|
||||
|
@ -108,33 +110,41 @@ public final class FederationTestUtils {
|
|||
return report;
|
||||
}
|
||||
|
||||
public static void waitNamenodeRegistered(ActiveNamenodeResolver resolver,
|
||||
String nsId, String nnId, FederationNamenodeServiceState finalState)
|
||||
throws InterruptedException, IllegalStateException, IOException {
|
||||
/**
|
||||
* Wait for a namenode to be registered with a particular state.
|
||||
* @param resolver Active namenode resolver.
|
||||
* @param nsId Nameservice identifier.
|
||||
* @param nnId Namenode identifier.
|
||||
* @param finalState State to check for.
|
||||
* @throws Exception Failed to verify State Store registration of namenode
|
||||
* nsId:nnId for state.
|
||||
*/
|
||||
public static void waitNamenodeRegistered(
|
||||
final ActiveNamenodeResolver resolver,
|
||||
final String nsId, final String nnId,
|
||||
final FederationNamenodeServiceState state) throws Exception {
|
||||
|
||||
for (int loopCount = 0; loopCount < 20; loopCount++) {
|
||||
if (loopCount > 0) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
List<? extends FederationNamenodeContext> namenodes =
|
||||
resolver.getNamenodesForNameserviceId(nsId);
|
||||
for (FederationNamenodeContext namenode : namenodes) {
|
||||
// Check if this is the Namenode we are checking
|
||||
if (namenode.getNamenodeId() == nnId ||
|
||||
namenode.getNamenodeId().equals(nnId)) {
|
||||
if (finalState != null && !namenode.getState().equals(finalState)) {
|
||||
// Wrong state, wait a bit more
|
||||
break;
|
||||
} else {
|
||||
// Found and verified
|
||||
return;
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
List<? extends FederationNamenodeContext> namenodes =
|
||||
resolver.getNamenodesForNameserviceId(nsId);
|
||||
if (namenodes != null) {
|
||||
for (FederationNamenodeContext namenode : namenodes) {
|
||||
// Check if this is the Namenode we are checking
|
||||
if (namenode.getNamenodeId() == nnId ||
|
||||
namenode.getNamenodeId().equals(nnId)) {
|
||||
return state == null || namenode.getState().equals(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
fail("Failed to verify State Store registration of " + nsId + " " + nnId +
|
||||
" for state " + finalState);
|
||||
}, 1000, 20 * 1000);
|
||||
}
|
||||
|
||||
public static boolean verifyDate(Date d1, Date d2, long precision) {
|
||||
|
|
|
@ -747,8 +747,7 @@ public class RouterDFSCluster {
|
|||
}
|
||||
}
|
||||
|
||||
public void waitNamenodeRegistration()
|
||||
throws InterruptedException, IllegalStateException, IOException {
|
||||
public void waitNamenodeRegistration() throws Exception {
|
||||
for (RouterContext r : this.routers) {
|
||||
Router router = r.router;
|
||||
for (NamenodeContext nn : this.namenodes) {
|
||||
|
@ -761,7 +760,7 @@ public class RouterDFSCluster {
|
|||
|
||||
public void waitRouterRegistrationQuorum(RouterContext router,
|
||||
FederationNamenodeServiceState state, String nsId, String nnId)
|
||||
throws InterruptedException, IOException {
|
||||
throws Exception {
|
||||
LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
|
||||
ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
|
||||
waitNamenodeRegistered(nnResolver, nsId, nnId, state);
|
||||
|
|
|
@ -69,6 +69,7 @@ public class TestMountTableResolver {
|
|||
* ______file1.txt -> 4:/user/file1.txt
|
||||
* __usr
|
||||
* ____bin -> 2:/bin
|
||||
* __readonly -> 2:/tmp
|
||||
*
|
||||
* @throws IOException If it cannot set the mount table.
|
||||
*/
|
||||
|
@ -107,6 +108,12 @@ public class TestMountTableResolver {
|
|||
// /user/a/demo/test/b
|
||||
map = getMountTableEntry("3", "/user/test");
|
||||
mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map));
|
||||
|
||||
// /readonly
|
||||
map = getMountTableEntry("2", "/tmp");
|
||||
MountTable readOnlyEntry = MountTable.newInstance("/readonly", map);
|
||||
readOnlyEntry.setReadOnly(true);
|
||||
mountTable.addEntry(readOnlyEntry);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -152,6 +159,9 @@ public class TestMountTableResolver {
|
|||
assertEquals("3->/user/test/a",
|
||||
mountTable.getDestinationForPath("/user/test/a").toString());
|
||||
|
||||
assertEquals("2->/tmp/tesfile1.txt",
|
||||
mountTable.getDestinationForPath("/readonly/tesfile1.txt").toString());
|
||||
|
||||
}
|
||||
|
||||
private void compareLists(List<String> list1, String[] list2) {
|
||||
|
@ -166,8 +176,8 @@ public class TestMountTableResolver {
|
|||
|
||||
// Check getting all mount points (virtual and real) beneath a path
|
||||
List<String> mounts = mountTable.getMountPoints("/");
|
||||
assertEquals(3, mounts.size());
|
||||
compareLists(mounts, new String[] {"tmp", "user", "usr"});
|
||||
assertEquals(4, mounts.size());
|
||||
compareLists(mounts, new String[] {"tmp", "user", "usr", "readonly"});
|
||||
|
||||
mounts = mountTable.getMountPoints("/user");
|
||||
assertEquals(2, mounts.size());
|
||||
|
@ -212,9 +222,10 @@ public class TestMountTableResolver {
|
|||
|
||||
// Check listing the mount table records at or beneath a path
|
||||
List<MountTable> records = mountTable.getMounts("/");
|
||||
assertEquals(8, records.size());
|
||||
assertEquals(9, records.size());
|
||||
compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin",
|
||||
"user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt"});
|
||||
"user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt",
|
||||
"readonly"});
|
||||
|
||||
records = mountTable.getMounts("/user");
|
||||
assertEquals(5, records.size());
|
||||
|
@ -229,6 +240,11 @@ public class TestMountTableResolver {
|
|||
records = mountTable.getMounts("/tmp");
|
||||
assertEquals(1, records.size());
|
||||
compareRecords(records, new String[] {"/tmp"});
|
||||
|
||||
records = mountTable.getMounts("/readonly");
|
||||
assertEquals(1, records.size());
|
||||
compareRecords(records, new String[] {"/readonly"});
|
||||
assertTrue(records.get(0).isReadOnly());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -237,7 +253,7 @@ public class TestMountTableResolver {
|
|||
|
||||
// 3 mount points are present /tmp, /user, /usr
|
||||
compareLists(mountTable.getMountPoints("/"),
|
||||
new String[] {"user", "usr", "tmp"});
|
||||
new String[] {"user", "usr", "tmp", "readonly"});
|
||||
|
||||
// /tmp currently points to namespace 2
|
||||
assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt")
|
||||
|
@ -248,7 +264,7 @@ public class TestMountTableResolver {
|
|||
|
||||
// Now 2 mount points are present /user, /usr
|
||||
compareLists(mountTable.getMountPoints("/"),
|
||||
new String[] {"user", "usr"});
|
||||
new String[] {"user", "usr", "readonly"});
|
||||
|
||||
// /tmp no longer exists, uses default namespace for mapping /
|
||||
assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt")
|
||||
|
@ -261,7 +277,7 @@ public class TestMountTableResolver {
|
|||
|
||||
// 3 mount points are present /tmp, /user, /usr
|
||||
compareLists(mountTable.getMountPoints("/"),
|
||||
new String[] {"user", "usr", "tmp"});
|
||||
new String[] {"user", "usr", "tmp", "readonly"});
|
||||
|
||||
// /usr is virtual, uses namespace 1->/
|
||||
assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt")
|
||||
|
@ -272,7 +288,7 @@ public class TestMountTableResolver {
|
|||
|
||||
// Verify the remove failed
|
||||
compareLists(mountTable.getMountPoints("/"),
|
||||
new String[] {"user", "usr", "tmp"});
|
||||
new String[] {"user", "usr", "tmp", "readonly"});
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -304,7 +320,7 @@ public class TestMountTableResolver {
|
|||
|
||||
// Initial table loaded
|
||||
testDestination();
|
||||
assertEquals(8, mountTable.getMounts("/").size());
|
||||
assertEquals(9, mountTable.getMounts("/").size());
|
||||
|
||||
// Replace table with /1 and /2
|
||||
List<MountTable> records = new ArrayList<>();
|
||||
|
|
|
@ -143,6 +143,37 @@ public class TestRouterAdmin {
|
|||
assertFalse(addResponse2.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddReadOnlyMountTable() throws IOException {
|
||||
MountTable newEntry = MountTable.newInstance(
|
||||
"/readonly", Collections.singletonMap("ns0", "/testdir"),
|
||||
Time.now(), Time.now());
|
||||
newEntry.setReadOnly(true);
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Existing mount table size
|
||||
List<MountTable> records = getMountTableEntries(mountTable);
|
||||
assertEquals(records.size(), mockMountTable.size());
|
||||
|
||||
// Add
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTable.addMountTableEntry(addRequest);
|
||||
assertTrue(addResponse.getStatus());
|
||||
|
||||
// New mount table size
|
||||
List<MountTable> records2 = getMountTableEntries(mountTable);
|
||||
assertEquals(records2.size(), mockMountTable.size() + 1);
|
||||
|
||||
// Check that we have the read only entry
|
||||
MountTable record = getMountTableEntry("/readonly");
|
||||
assertEquals("/readonly", record.getSourcePath());
|
||||
assertTrue(record.isReadOnly());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveMountTable() throws IOException {
|
||||
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test a router end-to-end including the MountTable.
|
||||
*/
|
||||
public class TestRouterMountTable {
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static NamenodeContext nnContext;
|
||||
private static RouterContext routerContext;
|
||||
private static MountTableResolver mountTable;
|
||||
|
||||
@BeforeClass
|
||||
public static void globalSetUp() throws Exception {
|
||||
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 1);
|
||||
Configuration conf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.build();
|
||||
cluster.addRouterOverrides(conf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
|
||||
// Get the end points
|
||||
nnContext = cluster.getRandomNamenode();
|
||||
routerContext = cluster.getRandomRouter();
|
||||
Router router = routerContext.getRouter();
|
||||
mountTable = (MountTableResolver) router.getSubclusterResolver();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.stopRouter(routerContext);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOnly() throws Exception {
|
||||
|
||||
// Add a read only entry
|
||||
MountTable readOnlyEntry = MountTable.newInstance(
|
||||
"/readonly", Collections.singletonMap("ns0", "/testdir"));
|
||||
readOnlyEntry.setReadOnly(true);
|
||||
assertTrue(addMountTable(readOnlyEntry));
|
||||
|
||||
// Add a regular entry
|
||||
MountTable regularEntry = MountTable.newInstance(
|
||||
"/regular", Collections.singletonMap("ns0", "/testdir"));
|
||||
assertTrue(addMountTable(regularEntry));
|
||||
|
||||
// Create a folder which should show in all locations
|
||||
final FileSystem nnFs = nnContext.getFileSystem();
|
||||
final FileSystem routerFs = routerContext.getFileSystem();
|
||||
assertTrue(routerFs.mkdirs(new Path("/regular/newdir")));
|
||||
|
||||
FileStatus dirStatusNn =
|
||||
nnFs.getFileStatus(new Path("/testdir/newdir"));
|
||||
assertTrue(dirStatusNn.isDirectory());
|
||||
FileStatus dirStatusRegular =
|
||||
routerFs.getFileStatus(new Path("/regular/newdir"));
|
||||
assertTrue(dirStatusRegular.isDirectory());
|
||||
FileStatus dirStatusReadOnly =
|
||||
routerFs.getFileStatus(new Path("/readonly/newdir"));
|
||||
assertTrue(dirStatusReadOnly.isDirectory());
|
||||
|
||||
// It should fail writing into a read only path
|
||||
try {
|
||||
routerFs.mkdirs(new Path("/readonly/newdirfail"));
|
||||
fail("We should not be able to write into a read only mount point");
|
||||
} catch (IOException ioe) {
|
||||
String msg = ioe.getMessage();
|
||||
assertTrue(msg.startsWith(
|
||||
"/readonly/newdirfail is in a read only mount point"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a mount table entry to the mount table through the admin API.
|
||||
* @param entry Mount table entry to add.
|
||||
* @return If it was succesfully added.
|
||||
* @throws IOException Problems adding entries.
|
||||
*/
|
||||
private boolean addMountTable(final MountTable entry) throws IOException {
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTableManager = client.getMountTableManager();
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(entry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTableManager.addMountTableEntry(addRequest);
|
||||
|
||||
// Reload the Router cache
|
||||
mountTable.loadCache(true);
|
||||
|
||||
return addResponse.getStatus();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue