From 77299ae992b16066dd61e4fec9ff63b863ae2e21 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 5 Jan 2021 00:05:03 +0530 Subject: [PATCH] HDFS-15748. RBF: Move the router related part from hadoop-federation-balance module to hadoop-hdfs-rbf. Contributed by Jinglun. --- .../hadoop-common/src/main/bin/hadoop | 6 + hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml | 11 + .../hdfs/rbfbalance}/MountTableProcedure.java | 2 +- .../rbfbalance/RouterDistCpProcedure.java | 56 +++ .../hdfs/rbfbalance/RouterFedBalance.java | 383 ++++++++++++++++++ .../hadoop/hdfs/rbfbalance/package-info.java | 25 ++ .../rbfbalance}/TestMountTableProcedure.java | 2 +- .../hadoop-federation-balance/pom.xml | 11 - .../tools/fedbalance/DistCpProcedure.java | 37 +- .../hadoop/tools/fedbalance/FedBalance.java | 88 +--- .../tools/fedbalance/FedBalanceContext.java | 15 +- .../tools/fedbalance/FedBalanceOptions.java | 28 +- .../site/markdown/HDFSFederationBalance.md | 18 +- .../tools/fedbalance/TestDistCpProcedure.java | 7 +- 14 files changed, 545 insertions(+), 144 deletions(-) rename {hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance => hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance}/MountTableProcedure.java (99%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java rename {hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance => hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance}/TestMountTableProcedure.java (99%) diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop index 7d9ffc69bc5..7f46e7e2ab2 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop @@ -47,6 +47,7 @@ function hadoop_usage hadoop_add_subcommand "trace" client "view and modify Hadoop tracing settings" hadoop_add_subcommand "version" client "print the version" hadoop_add_subcommand "kdiag" client "Diagnose Kerberos Problems" + hadoop_add_subcommand "rbfbalance" client "move directories and files across router-based federation namespaces" hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true } @@ -171,6 +172,11 @@ function hadoopcmd_case version) HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo ;; + rbfbalance) + HADOOP_CLASSNAME=org.apache.hadoop.hdfs.rbfbalance.RouterFedBalance + hadoop_add_to_classpath_tools hadoop-federation-balance + hadoop_add_to_classpath_tools hadoop-distcp + ;; *) HADOOP_CLASSNAME="${subcmd}" if ! hadoop_validate_classname "${HADOOP_CLASSNAME}"; then diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml index 23e0b8feb0d..41290cc67fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml @@ -69,6 +69,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> hadoop-hdfs-client provided + + org.apache.hadoop + hadoop-federation-balance + provided + org.slf4j slf4j-log4j12 @@ -85,6 +90,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> test test-jar + + org.apache.hadoop + hadoop-federation-balance + test + test-jar + com.fasterxml.jackson.core jackson-annotations diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java similarity index 99% rename from hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java index 17bc82822d1..8bd39d13a00 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.tools.fedbalance; +package org.apache.hadoop.hdfs.rbfbalance; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java new file mode 100644 index 00000000000..b07f3b219f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.rbfbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.fedbalance.DistCpProcedure; +import org.apache.hadoop.tools.fedbalance.FedBalanceContext; + +import java.io.IOException; + +/** + * Copy data through distcp in router-based federation cluster. It disables + * write by setting mount entry readonly. + */ +public class RouterDistCpProcedure extends DistCpProcedure { + + public RouterDistCpProcedure() {} + + public RouterDistCpProcedure(String name, String nextProcedure, + long delayDuration, FedBalanceContext context) throws IOException { + super(name, nextProcedure, delayDuration, context); + } + + /** + * Disable write by making the mount entry readonly. + */ + @Override + protected void disableWrite(FedBalanceContext context) throws IOException { + Configuration conf = context.getConf(); + String mount = context.getMount(); + MountTableProcedure.disableWrite(mount, conf); + } + + /** + * Enable write. + */ + @Override + protected void enableWrite() throws IOException { + // do nothing. + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java new file mode 100644 index 00000000000..f99a2f18e25 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.rbfbalance; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs; +import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; +import org.apache.hadoop.tools.fedbalance.TrashProcedure; +import org.apache.hadoop.tools.fedbalance.FedBalanceContext; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_DEFAULT_XML; +import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_SITE_XML; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH; + +/** + * Balance data in router-based federation cluster. From src sub-namespace to + * dst sub-namespace with distcp. + * + * 1. Move data from the source path to the destination path with distcp. + * 2. Update the the mount entry. + * 3. Delete the source path to trash. + */ +public class RouterFedBalance extends Configured implements Tool { + + public static final Logger LOG = + LoggerFactory.getLogger(RouterFedBalance.class); + private static final String SUBMIT_COMMAND = "submit"; + private static final String CONTINUE_COMMAND = "continue"; + private static final String DISTCP_PROCEDURE = "distcp-procedure"; + private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure"; + private static final String TRASH_PROCEDURE = "trash-procedure"; + + /** + * This class helps building the balance job. + */ + private class Builder { + /* Force close all open files while there is no diff. */ + private boolean forceCloseOpen = false; + /* Max number of concurrent maps to use for copy. */ + private int map = 10; + /* Specify bandwidth per map in MB. */ + private int bandwidth = 10; + /* Specify the trash behaviour of the source path. */ + private FedBalanceConfigs.TrashOption trashOpt = TrashOption.TRASH; + /* Specify the duration(millie seconds) when the procedure needs retry. */ + private long delayDuration = TimeUnit.SECONDS.toMillis(1); + /* Specify the threshold of diff entries. */ + private int diffThreshold = 0; + /* The source input. This specifies the source path. */ + private final String inputSrc; + /* The dst input. This specifies the dst path. */ + private final String inputDst; + + Builder(String inputSrc, String inputDst) { + this.inputSrc = inputSrc; + this.inputDst = inputDst; + } + + /** + * Whether force close all open files while there is no diff. + * @param value true if force close all the open files. + */ + public Builder setForceCloseOpen(boolean value) { + this.forceCloseOpen = value; + return this; + } + + /** + * Max number of concurrent maps to use for copy. + * @param value the map number of the distcp. + */ + public Builder setMap(int value) { + this.map = value; + return this; + } + + /** + * Specify bandwidth per map in MB. + * @param value the bandwidth. + */ + public Builder setBandWidth(int value) { + this.bandwidth = value; + return this; + } + + /** + * Specify the trash behaviour of the source path. + * @param value the trash option. + */ + public Builder setTrashOpt(TrashOption value) { + this.trashOpt = value; + return this; + } + + /** + * Specify the duration(millie seconds) when the procedure needs retry. + * @param value the delay duration of the job. + */ + public Builder setDelayDuration(long value) { + this.delayDuration = value; + return this; + } + + /** + * Specify the threshold of diff entries. + * @param value the threshold of a fast distcp. + */ + public Builder setDiffThreshold(int value) { + this.diffThreshold = value; + return this; + } + + /** + * Build the balance job. + */ + public BalanceJob build() throws IOException { + // Construct job context. + FedBalanceContext context; + Path dst = new Path(inputDst); + if (dst.toUri().getAuthority() == null) { + throw new IOException("The destination cluster must be specified."); + } + Path src = getSrcPath(inputSrc); + String mount = inputSrc; + context = new FedBalanceContext.Builder(src, dst, mount, getConf()) + .setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(true) + .setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt) + .setDelayDuration(delayDuration).setDiffThreshold(diffThreshold) + .build(); + + LOG.info(context.toString()); + // Construct the balance job. + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + RouterDistCpProcedure dcp = + new RouterDistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, + context); + builder.nextProcedure(dcp); + MountTableProcedure mtp = + new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration, + inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(), + getConf()); + builder.nextProcedure(mtp); + TrashProcedure tp = + new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context); + builder.nextProcedure(tp); + return builder.build(); + } + } + + public RouterFedBalance() { + super(); + } + + @Override + public int run(String[] args) throws Exception { + CommandLineParser parser = new GnuParser(); + CommandLine command = parser.parse(CLI_OPTIONS, args, true); + String[] leftOverArgs = command.getArgs(); + if (leftOverArgs == null || leftOverArgs.length < 1) { + printUsage(); + return -1; + } + String cmd = leftOverArgs[0]; + if (cmd.equals(SUBMIT_COMMAND)) { + if (leftOverArgs.length < 3) { + printUsage(); + return -1; + } + String inputSrc = leftOverArgs[1]; + String inputDst = leftOverArgs[2]; + return submit(command, inputSrc, inputDst); + } else if (cmd.equals(CONTINUE_COMMAND)) { + return continueJob(); + } else { + printUsage(); + return -1; + } + } + + /** + * Recover and continue the unfinished jobs. + */ + private int continueJob() throws InterruptedException { + BalanceProcedureScheduler scheduler = + new BalanceProcedureScheduler(getConf()); + try { + scheduler.init(true); + while (true) { + Collection jobs = scheduler.getAllJobs(); + int unfinished = 0; + for (BalanceJob job : jobs) { + if (!job.isJobDone()) { + unfinished++; + } + LOG.info(job.toString()); + } + if (unfinished == 0) { + break; + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } + } catch (IOException e) { + LOG.error("Continue balance job failed.", e); + return -1; + } finally { + scheduler.shutDown(); + } + return 0; + } + + /** + * Start a ProcedureScheduler and submit the job. + * + * @param command the command options. + * @param inputSrc the source input. This specifies the source path. + * @param inputDst the dst input. This specifies the dst path. + */ + private int submit(CommandLine command, String inputSrc, String inputDst) + throws IOException { + Builder builder = new Builder(inputSrc, inputDst); + // parse options. + builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt())); + if (command.hasOption(MAP.getOpt())) { + builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt()))); + } + if (command.hasOption(BANDWIDTH.getOpt())) { + builder.setBandWidth( + Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt()))); + } + if (command.hasOption(DELAY_DURATION.getOpt())) { + builder.setDelayDuration( + Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt()))); + } + if (command.hasOption(DIFF_THRESHOLD.getOpt())) { + builder.setDiffThreshold(Integer.parseInt( + command.getOptionValue(DIFF_THRESHOLD.getOpt()))); + } + if (command.hasOption(TRASH.getOpt())) { + String val = command.getOptionValue(TRASH.getOpt()); + if (val.equalsIgnoreCase("skip")) { + builder.setTrashOpt(TrashOption.SKIP); + } else if (val.equalsIgnoreCase("trash")) { + builder.setTrashOpt(TrashOption.TRASH); + } else if (val.equalsIgnoreCase("delete")) { + builder.setTrashOpt(TrashOption.DELETE); + } else { + printUsage(); + return -1; + } + } + + // Submit the job. + BalanceProcedureScheduler scheduler = + new BalanceProcedureScheduler(getConf()); + scheduler.init(false); + try { + BalanceJob balanceJob = builder.build(); + // Submit and wait until the job is done. + scheduler.submit(balanceJob); + scheduler.waitUntilDone(balanceJob); + } catch (IOException e) { + LOG.error("Submit balance job failed.", e); + return -1; + } finally { + scheduler.shutDown(); + } + return 0; + } + + /** + * Get src uri from Router. + */ + private Path getSrcPath(String fedPath) throws IOException { + String address = getConf().getTrimmed( + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); + RouterClient rClient = new RouterClient(routerSocket, getConf()); + try { + MountTableManager mountTable = rClient.getMountTableManager(); + MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable); + if (entry == null) { + throw new IllegalArgumentException( + "The mount point doesn't exist. path=" + fedPath); + } else if (entry.getDestinations().size() > 1) { + throw new IllegalArgumentException( + "The mount point has more than one destination. path=" + fedPath); + } else { + String ns = entry.getDestinations().get(0).getNameserviceId(); + String path = entry.getDestinations().get(0).getDest(); + return new Path("hdfs://" + ns + path); + } + } finally { + rClient.close(); + } + } + + private void printUsage() { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( + "rbfbalance OPTIONS [submit|continue] \n\nOPTIONS", + CLI_OPTIONS); + } + + /** + * Loads properties from hdfs-fedbalance-default.xml into configuration + * object. + * + * @return Configuration which includes properties from + * hdfs-fedbalance-default.xml and hdfs-fedbalance-site.xml + */ + @VisibleForTesting + static Configuration getDefaultConf() { + Configuration config = new Configuration(); + config.addResource(FED_BALANCE_DEFAULT_XML); + config.addResource(FED_BALANCE_SITE_XML); + return config; + } + + /** + * Main function of the RouterFedBalance program. Parses the input arguments + * and invokes the RouterFedBalance::run() method, via the ToolRunner. + * @param argv Command-line arguments sent to RouterFedBalance. + */ + public static void main(String[] argv) { + Configuration conf = getDefaultConf(); + RouterFedBalance fedBalance = new RouterFedBalance(); + fedBalance.setConf(conf); + int exitCode; + try { + exitCode = ToolRunner.run(fedBalance, argv); + } catch (Exception e) { + LOG.warn("Couldn't complete RouterFedBalance operation.", e); + exitCode = -1; + } + System.exit(exitCode); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java new file mode 100644 index 00000000000..ff6a1d244a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * FedBalance is a tool for balancing data across federation clusters. + */ +@InterfaceAudience.Public +package org.apache.hadoop.hdfs.rbfbalance; +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java similarity index 99% rename from hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java index 9dd4e5da8fe..4f94c0ea6c4 100644 --- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.tools.fedbalance; +package org.apache.hadoop.hdfs.rbfbalance; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; diff --git a/hadoop-tools/hadoop-federation-balance/pom.xml b/hadoop-tools/hadoop-federation-balance/pom.xml index cf79e17c5ad..588bb98f3e7 100644 --- a/hadoop-tools/hadoop-federation-balance/pom.xml +++ b/hadoop-tools/hadoop-federation-balance/pom.xml @@ -103,17 +103,6 @@ test test-jar - - org.apache.hadoop - hadoop-hdfs-rbf - provided - - - org.apache.hadoop - hadoop-hdfs-rbf - test - test-jar - org.mockito mockito-core diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java index 33d37be35bd..fa4a088631a 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java @@ -155,7 +155,7 @@ public class DistCpProcedure extends BalanceProcedure { diffDistCp(); return false; case DISABLE_WRITE: - disableWrite(); + disableWrite(context); return false; case FINAL_DISTCP: finalDistCp(); @@ -238,23 +238,28 @@ public class DistCpProcedure extends BalanceProcedure { } /** - * Disable write either by making the mount entry readonly or cancelling the - * execute permission of the source path. + * Disable write by cancelling the execute permission of the source path. + * TODO: Disable the super user from writing. + * @param fbcontext the context. + * @throws IOException if can't disable write. */ - void disableWrite() throws IOException { - if (useMountReadOnly) { - String mount = context.getMount(); - MountTableProcedure.disableWrite(mount, conf); - } else { - // Save and cancel permission. - FileStatus status = srcFs.getFileStatus(src); - fPerm = status.getPermission(); - acl = srcFs.getAclStatus(src); - srcFs.setPermission(src, FsPermission.createImmutable((short) 0)); - } + protected void disableWrite(FedBalanceContext fbcontext) throws IOException { + // Save and cancel permission. + FileStatus status = srcFs.getFileStatus(src); + fPerm = status.getPermission(); + acl = srcFs.getAclStatus(src); + srcFs.setPermission(src, FsPermission.createImmutable((short) 0)); updateStage(Stage.FINAL_DISTCP); } + /** + * Enable write. + * @throws IOException if can't enable write. + */ + protected void enableWrite() throws IOException { + restorePermission(); + } + /** * Enable write by restoring the x permission. */ @@ -297,9 +302,7 @@ public class DistCpProcedure extends BalanceProcedure { } void finish() throws IOException { - if (!useMountReadOnly) { - restorePermission(); - } + enableWrite(); if (srcFs.exists(src)) { cleanupSnapshot(srcFs, src); } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java index c8507980c8e..64805c05186 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java @@ -27,24 +27,17 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; -import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; -import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; -import org.apache.hadoop.hdfs.server.federation.router.RouterClient; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob; import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.ROUTER; import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN; import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP; import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH; @@ -58,8 +51,7 @@ import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; * Balance data from src cluster to dst cluster with distcp. * * 1. Move data from the source path to the destination path with distcp. - * 2. Update the the mount entry. - * 3. Delete the source path to trash. + * 2. Delete the source path to trash. */ public class FedBalance extends Configured implements Tool { @@ -69,19 +61,16 @@ public class FedBalance extends Configured implements Tool { private static final String CONTINUE_COMMAND = "continue"; private static final String NO_MOUNT = "no-mount"; private static final String DISTCP_PROCEDURE = "distcp-procedure"; - private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure"; private static final String TRASH_PROCEDURE = "trash-procedure"; - private static final String FED_BALANCE_DEFAULT_XML = + public static final String FED_BALANCE_DEFAULT_XML = "hdfs-fedbalance-default.xml"; - private static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml"; + public static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml"; /** * This class helps building the balance job. */ private class Builder { - /* Balancing in an rbf cluster. */ - private boolean routerCluster = false; /* Force close all open files while there is no diff. */ private boolean forceCloseOpen = false; /* Max number of concurrent maps to use for copy. */ @@ -104,15 +93,6 @@ public class FedBalance extends Configured implements Tool { this.inputDst = inputDst; } - /** - * Whether balancing in an rbf cluster. - * @param value true if it's running in a router-based federation cluster. - */ - public Builder setRouterCluster(boolean value) { - this.routerCluster = value; - return this; - } - /** * Whether force close all open files while there is no diff. * @param value true if force close all the open files. @@ -177,26 +157,14 @@ public class FedBalance extends Configured implements Tool { if (dst.toUri().getAuthority() == null) { throw new IOException("The destination cluster must be specified."); } - if (routerCluster) { // router-based federation. - Path src = getSrcPath(inputSrc); - String mount = inputSrc; - context = new FedBalanceContext.Builder(src, dst, mount, getConf()) - .setForceCloseOpenFiles(forceCloseOpen) - .setUseMountReadOnly(routerCluster).setMapNum(map) - .setBandwidthLimit(bandwidth).setTrash(trashOpt) - .setDelayDuration(delayDuration) - .setDiffThreshold(diffThreshold).build(); - } else { // normal federation cluster. - Path src = new Path(inputSrc); - if (src.toUri().getAuthority() == null) { - throw new IOException("The source cluster must be specified."); - } - context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf()) - .setForceCloseOpenFiles(forceCloseOpen) - .setUseMountReadOnly(routerCluster).setMapNum(map) - .setBandwidthLimit(bandwidth).setTrash(trashOpt) - .setDiffThreshold(diffThreshold).build(); + Path src = new Path(inputSrc); + if (src.toUri().getAuthority() == null) { + throw new IOException("The source cluster must be specified."); } + context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf()) + .setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(false) + .setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt) + .setDiffThreshold(diffThreshold).build(); LOG.info(context.toString()); // Construct the balance job. @@ -204,13 +172,6 @@ public class FedBalance extends Configured implements Tool { DistCpProcedure dcp = new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context); builder.nextProcedure(dcp); - if (routerCluster) { - MountTableProcedure mtp = - new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration, - inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(), - getConf()); - builder.nextProcedure(mtp); - } TrashProcedure tp = new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context); builder.nextProcedure(tp); @@ -291,7 +252,6 @@ public class FedBalance extends Configured implements Tool { throws IOException { Builder builder = new Builder(inputSrc, inputDst); // parse options. - builder.setRouterCluster(command.hasOption(ROUTER.getOpt())); builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt())); if (command.hasOption(MAP.getOpt())) { builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt()))); @@ -340,34 +300,6 @@ public class FedBalance extends Configured implements Tool { return 0; } - /** - * Get src uri from Router. - */ - private Path getSrcPath(String fedPath) throws IOException { - String address = getConf().getTrimmed( - RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, - RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); - InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); - RouterClient rClient = new RouterClient(routerSocket, getConf()); - try { - MountTableManager mountTable = rClient.getMountTableManager(); - MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable); - if (entry == null) { - throw new IllegalArgumentException( - "The mount point doesn't exist. path=" + fedPath); - } else if (entry.getDestinations().size() > 1) { - throw new IllegalArgumentException( - "The mount point has more than one destination. path=" + fedPath); - } else { - String ns = entry.getDestinations().get(0).getNameserviceId(); - String path = entry.getDestinations().get(0).getDest(); - return new Path("hdfs://" + ns + path); - } - } finally { - rClient.close(); - } - } - private void printUsage() { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp( diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java index f4f570026f9..ec47a942272 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java @@ -194,7 +194,7 @@ public class FedBalanceContext implements Writable { return builder.toString(); } - static class Builder { + public static class Builder { private final Path src; private final Path dst; private final String mount; @@ -215,7 +215,7 @@ public class FedBalanceContext implements Writable { * @param mount the mount point to be balanced. * @param conf the configuration. */ - Builder(Path src, Path dst, String mount, Configuration conf) { + public Builder(Path src, Path dst, String mount, Configuration conf) { this.src = src; this.dst = dst; this.mount = mount; @@ -225,6 +225,7 @@ public class FedBalanceContext implements Writable { /** * Force close open files. * @param value true if force close all the open files. + * @return the builder. */ public Builder setForceCloseOpenFiles(boolean value) { this.forceCloseOpenFiles = value; @@ -234,6 +235,7 @@ public class FedBalanceContext implements Writable { /** * Use mount point readonly to disable write. * @param value true if disabling write by setting mount point readonly. + * @return the builder. */ public Builder setUseMountReadOnly(boolean value) { this.useMountReadOnly = value; @@ -243,6 +245,7 @@ public class FedBalanceContext implements Writable { /** * The map number of the distcp job. * @param value the map number of the distcp. + * @return the builder. */ public Builder setMapNum(int value) { this.mapNum = value; @@ -252,6 +255,7 @@ public class FedBalanceContext implements Writable { /** * The bandwidth limit of the distcp job(MB). * @param value the bandwidth. + * @return the builder. */ public Builder setBandwidthLimit(int value) { this.bandwidthLimit = value; @@ -261,7 +265,8 @@ public class FedBalanceContext implements Writable { /** * Specify the trash behaviour after all the data is sync to the target. * @param value the trash option. - * */ + * @return the builder. + */ public Builder setTrash(TrashOption value) { this.trashOpt = value; return this; @@ -269,6 +274,8 @@ public class FedBalanceContext implements Writable { /** * Specify the delayed duration when the procedures need to retry. + * @param value the delay duration. + * @return the builder. */ public Builder setDelayDuration(long value) { this.delayDuration = value; @@ -277,6 +284,8 @@ public class FedBalanceContext implements Writable { /** * Specify the threshold of diff entries. + * @param value the diff threshold. + * @return the builder. */ public Builder setDiffThreshold(int value) { this.diffThreshold = value; diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java index d7be6a8157c..4df3f50ebff 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java @@ -30,43 +30,32 @@ public final class FedBalanceOptions { */ private FedBalanceOptions() {} - /** - * Run in router-based federation mode. - */ - final static Option ROUTER = new Option("router", false, - "If this option is set then the command runs in router mode." - + " The source path is taken as a mount point. It will disable write" - + " by setting the mount point readonly. Otherwise the command works" - + " in normal federation mode. The source path is taken as the full" - + " path. It will disable write by cancelling all permissions of the" - + " source path."); - /** * If true, in DIFF_DISTCP stage it will force close all open files when * there is no diff between the source path and the dst path. Otherwise * the DIFF_DISTCP stage will wait until there is no open files. The * default value is `false`. */ - final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false, - "Force close all open files if the src and dst are synced."); + public final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", + false, "Force close all open files if the src and dst are synced."); /** * Max number of maps to use during copy. DistCp will split work as equally * as possible among these maps. */ - final static Option MAP = + public final static Option MAP = new Option("map", true, "Max number of concurrent maps to use for copy"); /** * Specify bandwidth per map in MB, accepts bandwidth as a fraction. */ - final static Option BANDWIDTH = + public final static Option BANDWIDTH = new Option("bandwidth", true, "Specify bandwidth per map in MB."); /** * Specify the delayed duration(millie seconds) to retry the Job. */ - final static Option DELAY_DURATION = new Option("delay", true, + public final static Option DELAY_DURATION = new Option("delay", true, "This specifies the delayed duration(millie seconds) when the job" + " needs to retry. A job may retry many times and check the state" + " when it waits for the distcp job to finish."); @@ -74,7 +63,7 @@ public final class FedBalanceOptions { /** * Specify the threshold of diff entries. */ - final static Option DIFF_THRESHOLD = new Option("diffThreshold", true, + public final static Option DIFF_THRESHOLD = new Option("diffThreshold", true, "This specifies the threshold of the diff entries that used in" + " incremental copy stage. If the diff entries size is no greater" + " than this threshold and the open files check is satisfied" @@ -86,17 +75,16 @@ public final class FedBalanceOptions { * Move the source path to trash after all the data are sync to target, or * delete the source directly, or skip both trash and deletion. */ - final static Option TRASH = new Option("moveToTrash", true, + public final static Option TRASH = new Option("moveToTrash", true, "Move the source path to trash, or delete the source path directly," + " or skip both trash and deletion. This accepts 3 values: trash," + " delete and skip. By default the server side trash interval is" + " used. If the trash is disabled in the server side, the default" + " trash interval 60 minutes is used."); - final static Options CLI_OPTIONS = new Options(); + public final static Options CLI_OPTIONS = new Options(); static { - CLI_OPTIONS.addOption(ROUTER); CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN); CLI_OPTIONS.addOption(MAP); CLI_OPTIONS.addOption(BANDWIDTH); diff --git a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md index 03e6e60e57e..c9d643bf542 100644 --- a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md +++ b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md @@ -45,10 +45,9 @@ Usage The command below runs an hdfs federation balance job. The first parameter is the mount entry. The second one is the target path which must include the - target cluster. The option `-router` indicates this is in router-based - federation mode. + target cluster. - bash$ /bin/hadoop fedbalance -router submit /foo/src hdfs://namespace-1/foo/dst + bash$ /bin/hadoop rbfbalance -router submit /foo/src hdfs://namespace-1/foo/dst It copies data from hdfs://namespace-0/foo/src to hdfs://namespace-1/foo/dst incrementally and finally updates the mount entry to: @@ -59,7 +58,7 @@ Usage If the hadoop shell process exits unexpectedly, we can use the command below to continue the unfinished job: - bash$ /bin/hadoop fedbalance continue + bash$ /bin/hadoop rbfbalance continue This will scan the journal to find all the unfinished jobs, recover and continue to execute them. @@ -77,8 +76,8 @@ Usage * the router-based federation mode (RBF mode). * the normal federation mode. - By default the command runs in the normal federation mode. You can specify the - rbf mode by using the option `-router`. + The command `rbfbalance` runs in router-based federation mode. The command + `fedbalance` runs in normal federation mode. In the rbf mode the first parameter is taken as the mount point. It disables write by setting the mount point readonly. @@ -91,11 +90,10 @@ Usage ### Command Options -Command `submit` has 5 options: +Command `submit` has 4 options: | Option key | Description | Default | | ------------------------------ | ------------------------------------ | ------- | -| -router | Run in router-based federation mode. | Normal federation mode. | | -forceCloseOpen | Force close all open files when there is no diff in the DIFF_DISTCP stage. | Wait until there is no open files. | | -map | Max number of concurrent maps to use for copy. | 10 | | -bandwidth | Specify bandwidth per map in MB. | 10 | @@ -106,7 +104,7 @@ Command `submit` has 5 options: ### Configuration Options -------------------- -Set configuration options at fedbalance-site.xml. +Set configuration options at hdfs-fedbalance-site.xml. | Configuration key | Description | Default | | ------------------------------ | ------------------------------------ | ------- | @@ -165,7 +163,7 @@ Architecture of HDFS Federation Balance * MountTableProcedure: This procedure updates the mount entry in Router. The readonly is unset and the destination is updated of the mount point. This - procedure is activated only when option `-router`. + procedure is activated only in router based federation mode. * TrashProcedure: This procedure moves the source path to trash. diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java index ea5a8a0280a..9f554af2e6b 100644 --- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java @@ -281,7 +281,7 @@ public class TestDistCpProcedure { FedBalanceContext context = buildContext(src, dst, MOUNT); DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000, context); - dcProcedure.disableWrite(); + dcProcedure.disableWrite(context); dcProcedure.finish(); // Verify path and permission. @@ -317,7 +317,8 @@ public class TestDistCpProcedure { dcp[0] = serializeProcedure(dcp[0]); executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp()); dcp[0] = serializeProcedure(dcp[0]); - executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite()); + executeProcedure(dcp[0], Stage.FINAL_DISTCP, + () -> dcp[0].disableWrite(context)); dcp[0] = serializeProcedure(dcp[0]); OutputStream out = fs.append(new Path(src, "b/c")); executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp()); @@ -372,7 +373,7 @@ public class TestDistCpProcedure { new DistCpProcedure("distcp-procedure", null, 1000, context); assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort()); executeProcedure(dcProcedure, Stage.FINAL_DISTCP, - () -> dcProcedure.disableWrite()); + () -> dcProcedure.disableWrite(context)); assertEquals(0, fs.getFileStatus(src).getPermission().toShort()); cleanup(fs, new Path(testRoot)); }