diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java index b07f3b219f0..2f7ec06f7f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java @@ -44,6 +44,7 @@ protected void disableWrite(FedBalanceContext context) throws IOException { Configuration conf = context.getConf(); String mount = context.getMount(); MountTableProcedure.disableWrite(mount, conf); + updateStage(Stage.FINAL_DISTCP); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestRouterDistCpProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestRouterDistCpProcedure.java new file mode 100644 index 00000000000..60b32f64146 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestRouterDistCpProcedure.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.rbfbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage; +import org.apache.hadoop.tools.fedbalance.FedBalanceContext; +import org.apache.hadoop.tools.fedbalance.TestDistCpProcedure; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collections; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assert.assertTrue; + + +public class TestRouterDistCpProcedure extends TestDistCpProcedure { + private static StateStoreDFSCluster cluster; + private static MiniRouterDFSCluster.RouterContext routerContext; + private static Configuration routerConf; + private static StateStoreService stateStore; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + // Build and start a router with State Store + admin + RPC + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(conf); + cluster.startRouters(); + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + stateStore = router.getStateStore(); + + // Add one name services for testing + ActiveNamenodeResolver membership = router.getNamenodeResolver(); + membership.registerNamenode(createNamenodeReport("ns0", "nn1", + HAServiceProtocol.HAServiceState.ACTIVE)); + stateStore.refreshCaches(true); + + routerConf = new Configuration(); + InetSocketAddress routerSocket = router.getAdminServerAddress(); + routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + routerSocket); + } + + @Override + public void testDisableWrite() throws Exception { + // Firstly add mount entry: /test-write->{ns0,/test-write}. + String mount = "/test-write"; + MountTable newEntry = MountTable + .newInstance(mount, Collections.singletonMap("ns0", mount), + Time.now(), Time.now()); + MountTableManager mountTable = + routerContext.getAdminClient().getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + stateStore.loadCache(MountTableStoreImpl.class, true); // load cache. + + // Construct client. + URI address = routerContext.getFileSystemURI(); + DFSClient routerClient = new DFSClient(address, routerConf); + + FedBalanceContext context = new FedBalanceContext + .Builder(null, null, mount, routerConf).build(); + RouterDistCpProcedure dcProcedure = new RouterDistCpProcedure(); + executeProcedure(dcProcedure, Stage.FINAL_DISTCP, + () -> dcProcedure.disableWrite(context)); + intercept(RemoteException.class, "is in a read only mount point", + "Expect readonly exception.", () -> routerClient + .mkdirs(mount + "/dir", new FsPermission(020), false)); + } + + @AfterClass + public static void tearDown() { + cluster.stopRouter(routerContext); + } +} diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java index 1a892991c84..303945b5483 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java @@ -72,7 +72,7 @@ public class DistCpProcedure extends BalanceProcedure { LoggerFactory.getLogger(DistCpProcedure.class); /* Stages of this procedure. */ - enum Stage { + public enum Stage { PRE_CHECK, INIT_DISTCP, DIFF_DISTCP, DISABLE_WRITE, FINAL_DISTCP, FINISH } @@ -325,7 +325,7 @@ Stage getStage() { } @VisibleForTesting - void updateStage(Stage value) { + protected void updateStage(Stage value) { String oldStage = stage == null ? "null" : stage.name(); String newStage = value == null ? "null" : value.name(); LOG.info("Stage updated from {} to {}.", oldStage, newStage); diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java index 34b2d7b1d52..0bed3599036 100644 --- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java @@ -389,7 +389,7 @@ private FedBalanceContext buildContext(Path src, Path dst, String mount, .setDiffThreshold(diffThreshold).build(); } - interface Call { + protected interface Call { void execute() throws IOException, RetryException; } @@ -400,8 +400,8 @@ interface Call { * @param target the target stage. * @param call the function executing the procedure. */ - private static void executeProcedure(DistCpProcedure procedure, Stage target, - Call call) throws IOException { + protected static void executeProcedure(DistCpProcedure procedure, + Stage target, Call call) throws IOException { Stage stage = Stage.PRE_CHECK; procedure.updateStage(stage); while (stage != target) {