HDFS-15748. RBF: Move the router related part from hadoop-federation-balance module to hadoop-hdfs-rbf. Contributed by Jinglun.

This commit is contained in:
Ayush Saxena 2021-01-05 00:05:03 +05:30
parent 66ee0a6df0
commit 77299ae992
14 changed files with 545 additions and 144 deletions

View File

@ -47,6 +47,7 @@ function hadoop_usage
hadoop_add_subcommand "trace" client "view and modify Hadoop tracing settings"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "kdiag" client "Diagnose Kerberos Problems"
hadoop_add_subcommand "rbfbalance" client "move directories and files across router-based federation namespaces"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
}
@ -171,6 +172,11 @@ function hadoopcmd_case
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
rbfbalance)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.rbfbalance.RouterFedBalance
hadoop_add_to_classpath_tools hadoop-federation-balance
hadoop_add_to_classpath_tools hadoop-distcp
;;
*)
HADOOP_CLASSNAME="${subcmd}"
if ! hadoop_validate_classname "${HADOOP_CLASSNAME}"; then

View File

@ -69,6 +69,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hadoop-hdfs-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@ -85,6 +90,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import java.io.IOException;
/**
* Copy data through distcp in router-based federation cluster. It disables
* write by setting mount entry readonly.
*/
public class RouterDistCpProcedure extends DistCpProcedure {
public RouterDistCpProcedure() {}
public RouterDistCpProcedure(String name, String nextProcedure,
long delayDuration, FedBalanceContext context) throws IOException {
super(name, nextProcedure, delayDuration, context);
}
/**
* Disable write by making the mount entry readonly.
*/
@Override
protected void disableWrite(FedBalanceContext context) throws IOException {
Configuration conf = context.getConf();
String mount = context.getMount();
MountTableProcedure.disableWrite(mount, conf);
}
/**
* Enable write.
*/
@Override
protected void enableWrite() throws IOException {
// do nothing.
}
}

View File

@ -0,0 +1,383 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.apache.hadoop.tools.fedbalance.TrashProcedure;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_DEFAULT_XML;
import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_SITE_XML;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH;
/**
* Balance data in router-based federation cluster. From src sub-namespace to
* dst sub-namespace with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
* 2. Update the the mount entry.
* 3. Delete the source path to trash.
*/
public class RouterFedBalance extends Configured implements Tool {
public static final Logger LOG =
LoggerFactory.getLogger(RouterFedBalance.class);
private static final String SUBMIT_COMMAND = "submit";
private static final String CONTINUE_COMMAND = "continue";
private static final String DISTCP_PROCEDURE = "distcp-procedure";
private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
private static final String TRASH_PROCEDURE = "trash-procedure";
/**
* This class helps building the balance job.
*/
private class Builder {
/* Force close all open files while there is no diff. */
private boolean forceCloseOpen = false;
/* Max number of concurrent maps to use for copy. */
private int map = 10;
/* Specify bandwidth per map in MB. */
private int bandwidth = 10;
/* Specify the trash behaviour of the source path. */
private FedBalanceConfigs.TrashOption trashOpt = TrashOption.TRASH;
/* Specify the duration(millie seconds) when the procedure needs retry. */
private long delayDuration = TimeUnit.SECONDS.toMillis(1);
/* Specify the threshold of diff entries. */
private int diffThreshold = 0;
/* The source input. This specifies the source path. */
private final String inputSrc;
/* The dst input. This specifies the dst path. */
private final String inputDst;
Builder(String inputSrc, String inputDst) {
this.inputSrc = inputSrc;
this.inputDst = inputDst;
}
/**
* Whether force close all open files while there is no diff.
* @param value true if force close all the open files.
*/
public Builder setForceCloseOpen(boolean value) {
this.forceCloseOpen = value;
return this;
}
/**
* Max number of concurrent maps to use for copy.
* @param value the map number of the distcp.
*/
public Builder setMap(int value) {
this.map = value;
return this;
}
/**
* Specify bandwidth per map in MB.
* @param value the bandwidth.
*/
public Builder setBandWidth(int value) {
this.bandwidth = value;
return this;
}
/**
* Specify the trash behaviour of the source path.
* @param value the trash option.
*/
public Builder setTrashOpt(TrashOption value) {
this.trashOpt = value;
return this;
}
/**
* Specify the duration(millie seconds) when the procedure needs retry.
* @param value the delay duration of the job.
*/
public Builder setDelayDuration(long value) {
this.delayDuration = value;
return this;
}
/**
* Specify the threshold of diff entries.
* @param value the threshold of a fast distcp.
*/
public Builder setDiffThreshold(int value) {
this.diffThreshold = value;
return this;
}
/**
* Build the balance job.
*/
public BalanceJob build() throws IOException {
// Construct job context.
FedBalanceContext context;
Path dst = new Path(inputDst);
if (dst.toUri().getAuthority() == null) {
throw new IOException("The destination cluster must be specified.");
}
Path src = getSrcPath(inputSrc);
String mount = inputSrc;
context = new FedBalanceContext.Builder(src, dst, mount, getConf())
.setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(true)
.setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDelayDuration(delayDuration).setDiffThreshold(diffThreshold)
.build();
LOG.info(context.toString());
// Construct the balance job.
BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
RouterDistCpProcedure dcp =
new RouterDistCpProcedure(DISTCP_PROCEDURE, null, delayDuration,
context);
builder.nextProcedure(dcp);
MountTableProcedure mtp =
new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
getConf());
builder.nextProcedure(mtp);
TrashProcedure tp =
new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(tp);
return builder.build();
}
}
public RouterFedBalance() {
super();
}
@Override
public int run(String[] args) throws Exception {
CommandLineParser parser = new GnuParser();
CommandLine command = parser.parse(CLI_OPTIONS, args, true);
String[] leftOverArgs = command.getArgs();
if (leftOverArgs == null || leftOverArgs.length < 1) {
printUsage();
return -1;
}
String cmd = leftOverArgs[0];
if (cmd.equals(SUBMIT_COMMAND)) {
if (leftOverArgs.length < 3) {
printUsage();
return -1;
}
String inputSrc = leftOverArgs[1];
String inputDst = leftOverArgs[2];
return submit(command, inputSrc, inputDst);
} else if (cmd.equals(CONTINUE_COMMAND)) {
return continueJob();
} else {
printUsage();
return -1;
}
}
/**
* Recover and continue the unfinished jobs.
*/
private int continueJob() throws InterruptedException {
BalanceProcedureScheduler scheduler =
new BalanceProcedureScheduler(getConf());
try {
scheduler.init(true);
while (true) {
Collection<BalanceJob> jobs = scheduler.getAllJobs();
int unfinished = 0;
for (BalanceJob job : jobs) {
if (!job.isJobDone()) {
unfinished++;
}
LOG.info(job.toString());
}
if (unfinished == 0) {
break;
}
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
}
} catch (IOException e) {
LOG.error("Continue balance job failed.", e);
return -1;
} finally {
scheduler.shutDown();
}
return 0;
}
/**
* Start a ProcedureScheduler and submit the job.
*
* @param command the command options.
* @param inputSrc the source input. This specifies the source path.
* @param inputDst the dst input. This specifies the dst path.
*/
private int submit(CommandLine command, String inputSrc, String inputDst)
throws IOException {
Builder builder = new Builder(inputSrc, inputDst);
// parse options.
builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
if (command.hasOption(MAP.getOpt())) {
builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
}
if (command.hasOption(BANDWIDTH.getOpt())) {
builder.setBandWidth(
Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt())));
}
if (command.hasOption(DELAY_DURATION.getOpt())) {
builder.setDelayDuration(
Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
}
if (command.hasOption(DIFF_THRESHOLD.getOpt())) {
builder.setDiffThreshold(Integer.parseInt(
command.getOptionValue(DIFF_THRESHOLD.getOpt())));
}
if (command.hasOption(TRASH.getOpt())) {
String val = command.getOptionValue(TRASH.getOpt());
if (val.equalsIgnoreCase("skip")) {
builder.setTrashOpt(TrashOption.SKIP);
} else if (val.equalsIgnoreCase("trash")) {
builder.setTrashOpt(TrashOption.TRASH);
} else if (val.equalsIgnoreCase("delete")) {
builder.setTrashOpt(TrashOption.DELETE);
} else {
printUsage();
return -1;
}
}
// Submit the job.
BalanceProcedureScheduler scheduler =
new BalanceProcedureScheduler(getConf());
scheduler.init(false);
try {
BalanceJob balanceJob = builder.build();
// Submit and wait until the job is done.
scheduler.submit(balanceJob);
scheduler.waitUntilDone(balanceJob);
} catch (IOException e) {
LOG.error("Submit balance job failed.", e);
return -1;
} finally {
scheduler.shutDown();
}
return 0;
}
/**
* Get src uri from Router.
*/
private Path getSrcPath(String fedPath) throws IOException {
String address = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
RouterClient rClient = new RouterClient(routerSocket, getConf());
try {
MountTableManager mountTable = rClient.getMountTableManager();
MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
if (entry == null) {
throw new IllegalArgumentException(
"The mount point doesn't exist. path=" + fedPath);
} else if (entry.getDestinations().size() > 1) {
throw new IllegalArgumentException(
"The mount point has more than one destination. path=" + fedPath);
} else {
String ns = entry.getDestinations().get(0).getNameserviceId();
String path = entry.getDestinations().get(0).getDest();
return new Path("hdfs://" + ns + path);
}
} finally {
rClient.close();
}
}
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(
"rbfbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
CLI_OPTIONS);
}
/**
* Loads properties from hdfs-fedbalance-default.xml into configuration
* object.
*
* @return Configuration which includes properties from
* hdfs-fedbalance-default.xml and hdfs-fedbalance-site.xml
*/
@VisibleForTesting
static Configuration getDefaultConf() {
Configuration config = new Configuration();
config.addResource(FED_BALANCE_DEFAULT_XML);
config.addResource(FED_BALANCE_SITE_XML);
return config;
}
/**
* Main function of the RouterFedBalance program. Parses the input arguments
* and invokes the RouterFedBalance::run() method, via the ToolRunner.
* @param argv Command-line arguments sent to RouterFedBalance.
*/
public static void main(String[] argv) {
Configuration conf = getDefaultConf();
RouterFedBalance fedBalance = new RouterFedBalance();
fedBalance.setConf(conf);
int exitCode;
try {
exitCode = ToolRunner.run(fedBalance, argv);
} catch (Exception e) {
LOG.warn("Couldn't complete RouterFedBalance operation.", e);
exitCode = -1;
}
System.exit(exitCode);
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* FedBalance is a tool for balancing data across federation clusters.
*/
@InterfaceAudience.Public
package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;

View File

@ -103,17 +103,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>

View File

@ -155,7 +155,7 @@ public boolean execute() throws RetryException, IOException {
diffDistCp();
return false;
case DISABLE_WRITE:
disableWrite();
disableWrite(context);
return false;
case FINAL_DISTCP:
finalDistCp();
@ -238,23 +238,28 @@ void diffDistCp() throws IOException, RetryException {
}
/**
* Disable write either by making the mount entry readonly or cancelling the
* execute permission of the source path.
* Disable write by cancelling the execute permission of the source path.
* TODO: Disable the super user from writing.
* @param fbcontext the context.
* @throws IOException if can't disable write.
*/
void disableWrite() throws IOException {
if (useMountReadOnly) {
String mount = context.getMount();
MountTableProcedure.disableWrite(mount, conf);
} else {
// Save and cancel permission.
FileStatus status = srcFs.getFileStatus(src);
fPerm = status.getPermission();
acl = srcFs.getAclStatus(src);
srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
}
protected void disableWrite(FedBalanceContext fbcontext) throws IOException {
// Save and cancel permission.
FileStatus status = srcFs.getFileStatus(src);
fPerm = status.getPermission();
acl = srcFs.getAclStatus(src);
srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
updateStage(Stage.FINAL_DISTCP);
}
/**
* Enable write.
* @throws IOException if can't enable write.
*/
protected void enableWrite() throws IOException {
restorePermission();
}
/**
* Enable write by restoring the x permission.
*/
@ -297,9 +302,7 @@ void finalDistCp() throws IOException, RetryException {
}
void finish() throws IOException {
if (!useMountReadOnly) {
restorePermission();
}
enableWrite();
if (srcFs.exists(src)) {
cleanupSnapshot(srcFs, src);
}

View File

@ -27,24 +27,17 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.ROUTER;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
@ -58,8 +51,7 @@
* Balance data from src cluster to dst cluster with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
* 2. Update the the mount entry.
* 3. Delete the source path to trash.
* 2. Delete the source path to trash.
*/
public class FedBalance extends Configured implements Tool {
@ -69,19 +61,16 @@ public class FedBalance extends Configured implements Tool {
private static final String CONTINUE_COMMAND = "continue";
private static final String NO_MOUNT = "no-mount";
private static final String DISTCP_PROCEDURE = "distcp-procedure";
private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
private static final String TRASH_PROCEDURE = "trash-procedure";
private static final String FED_BALANCE_DEFAULT_XML =
public static final String FED_BALANCE_DEFAULT_XML =
"hdfs-fedbalance-default.xml";
private static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml";
public static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml";
/**
* This class helps building the balance job.
*/
private class Builder {
/* Balancing in an rbf cluster. */
private boolean routerCluster = false;
/* Force close all open files while there is no diff. */
private boolean forceCloseOpen = false;
/* Max number of concurrent maps to use for copy. */
@ -104,15 +93,6 @@ private class Builder {
this.inputDst = inputDst;
}
/**
* Whether balancing in an rbf cluster.
* @param value true if it's running in a router-based federation cluster.
*/
public Builder setRouterCluster(boolean value) {
this.routerCluster = value;
return this;
}
/**
* Whether force close all open files while there is no diff.
* @param value true if force close all the open files.
@ -177,26 +157,14 @@ public BalanceJob build() throws IOException {
if (dst.toUri().getAuthority() == null) {
throw new IOException("The destination cluster must be specified.");
}
if (routerCluster) { // router-based federation.
Path src = getSrcPath(inputSrc);
String mount = inputSrc;
context = new FedBalanceContext.Builder(src, dst, mount, getConf())
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDelayDuration(delayDuration)
.setDiffThreshold(diffThreshold).build();
} else { // normal federation cluster.
Path src = new Path(inputSrc);
if (src.toUri().getAuthority() == null) {
throw new IOException("The source cluster must be specified.");
}
context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDiffThreshold(diffThreshold).build();
Path src = new Path(inputSrc);
if (src.toUri().getAuthority() == null) {
throw new IOException("The source cluster must be specified.");
}
context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
.setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(false)
.setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDiffThreshold(diffThreshold).build();
LOG.info(context.toString());
// Construct the balance job.
@ -204,13 +172,6 @@ public BalanceJob build() throws IOException {
DistCpProcedure dcp =
new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(dcp);
if (routerCluster) {
MountTableProcedure mtp =
new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
getConf());
builder.nextProcedure(mtp);
}
TrashProcedure tp =
new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(tp);
@ -291,7 +252,6 @@ private int submit(CommandLine command, String inputSrc, String inputDst)
throws IOException {
Builder builder = new Builder(inputSrc, inputDst);
// parse options.
builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
if (command.hasOption(MAP.getOpt())) {
builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
@ -340,34 +300,6 @@ private int submit(CommandLine command, String inputSrc, String inputDst)
return 0;
}
/**
* Get src uri from Router.
*/
private Path getSrcPath(String fedPath) throws IOException {
String address = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
RouterClient rClient = new RouterClient(routerSocket, getConf());
try {
MountTableManager mountTable = rClient.getMountTableManager();
MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
if (entry == null) {
throw new IllegalArgumentException(
"The mount point doesn't exist. path=" + fedPath);
} else if (entry.getDestinations().size() > 1) {
throw new IllegalArgumentException(
"The mount point has more than one destination. path=" + fedPath);
} else {
String ns = entry.getDestinations().get(0).getNameserviceId();
String path = entry.getDestinations().get(0).getDest();
return new Path("hdfs://" + ns + path);
}
} finally {
rClient.close();
}
}
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(

View File

@ -194,7 +194,7 @@ public String toString() {
return builder.toString();
}
static class Builder {
public static class Builder {
private final Path src;
private final Path dst;
private final String mount;
@ -215,7 +215,7 @@ static class Builder {
* @param mount the mount point to be balanced.
* @param conf the configuration.
*/
Builder(Path src, Path dst, String mount, Configuration conf) {
public Builder(Path src, Path dst, String mount, Configuration conf) {
this.src = src;
this.dst = dst;
this.mount = mount;
@ -225,6 +225,7 @@ static class Builder {
/**
* Force close open files.
* @param value true if force close all the open files.
* @return the builder.
*/
public Builder setForceCloseOpenFiles(boolean value) {
this.forceCloseOpenFiles = value;
@ -234,6 +235,7 @@ public Builder setForceCloseOpenFiles(boolean value) {
/**
* Use mount point readonly to disable write.
* @param value true if disabling write by setting mount point readonly.
* @return the builder.
*/
public Builder setUseMountReadOnly(boolean value) {
this.useMountReadOnly = value;
@ -243,6 +245,7 @@ public Builder setUseMountReadOnly(boolean value) {
/**
* The map number of the distcp job.
* @param value the map number of the distcp.
* @return the builder.
*/
public Builder setMapNum(int value) {
this.mapNum = value;
@ -252,6 +255,7 @@ public Builder setMapNum(int value) {
/**
* The bandwidth limit of the distcp job(MB).
* @param value the bandwidth.
* @return the builder.
*/
public Builder setBandwidthLimit(int value) {
this.bandwidthLimit = value;
@ -261,7 +265,8 @@ public Builder setBandwidthLimit(int value) {
/**
* Specify the trash behaviour after all the data is sync to the target.
* @param value the trash option.
* */
* @return the builder.
*/
public Builder setTrash(TrashOption value) {
this.trashOpt = value;
return this;
@ -269,6 +274,8 @@ public Builder setTrash(TrashOption value) {
/**
* Specify the delayed duration when the procedures need to retry.
* @param value the delay duration.
* @return the builder.
*/
public Builder setDelayDuration(long value) {
this.delayDuration = value;
@ -277,6 +284,8 @@ public Builder setDelayDuration(long value) {
/**
* Specify the threshold of diff entries.
* @param value the diff threshold.
* @return the builder.
*/
public Builder setDiffThreshold(int value) {
this.diffThreshold = value;

View File

@ -30,43 +30,32 @@ public final class FedBalanceOptions {
*/
private FedBalanceOptions() {}
/**
* Run in router-based federation mode.
*/
final static Option ROUTER = new Option("router", false,
"If this option is set then the command runs in router mode."
+ " The source path is taken as a mount point. It will disable write"
+ " by setting the mount point readonly. Otherwise the command works"
+ " in normal federation mode. The source path is taken as the full"
+ " path. It will disable write by cancelling all permissions of the"
+ " source path.");
/**
* If true, in DIFF_DISTCP stage it will force close all open files when
* there is no diff between the source path and the dst path. Otherwise
* the DIFF_DISTCP stage will wait until there is no open files. The
* default value is `false`.
*/
final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false,
"Force close all open files if the src and dst are synced.");
public final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen",
false, "Force close all open files if the src and dst are synced.");
/**
* Max number of maps to use during copy. DistCp will split work as equally
* as possible among these maps.
*/
final static Option MAP =
public final static Option MAP =
new Option("map", true, "Max number of concurrent maps to use for copy");
/**
* Specify bandwidth per map in MB, accepts bandwidth as a fraction.
*/
final static Option BANDWIDTH =
public final static Option BANDWIDTH =
new Option("bandwidth", true, "Specify bandwidth per map in MB.");
/**
* Specify the delayed duration(millie seconds) to retry the Job.
*/
final static Option DELAY_DURATION = new Option("delay", true,
public final static Option DELAY_DURATION = new Option("delay", true,
"This specifies the delayed duration(millie seconds) when the job"
+ " needs to retry. A job may retry many times and check the state"
+ " when it waits for the distcp job to finish.");
@ -74,7 +63,7 @@ private FedBalanceOptions() {}
/**
* Specify the threshold of diff entries.
*/
final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
public final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
"This specifies the threshold of the diff entries that used in"
+ " incremental copy stage. If the diff entries size is no greater"
+ " than this threshold and the open files check is satisfied"
@ -86,17 +75,16 @@ private FedBalanceOptions() {}
* Move the source path to trash after all the data are sync to target, or
* delete the source directly, or skip both trash and deletion.
*/
final static Option TRASH = new Option("moveToTrash", true,
public final static Option TRASH = new Option("moveToTrash", true,
"Move the source path to trash, or delete the source path directly,"
+ " or skip both trash and deletion. This accepts 3 values: trash,"
+ " delete and skip. By default the server side trash interval is"
+ " used. If the trash is disabled in the server side, the default"
+ " trash interval 60 minutes is used.");
final static Options CLI_OPTIONS = new Options();
public final static Options CLI_OPTIONS = new Options();
static {
CLI_OPTIONS.addOption(ROUTER);
CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN);
CLI_OPTIONS.addOption(MAP);
CLI_OPTIONS.addOption(BANDWIDTH);

View File

@ -45,10 +45,9 @@ Usage
The command below runs an hdfs federation balance job. The first parameter is
the mount entry. The second one is the target path which must include the
target cluster. The option `-router` indicates this is in router-based
federation mode.
target cluster.
bash$ /bin/hadoop fedbalance -router submit /foo/src hdfs://namespace-1/foo/dst
bash$ /bin/hadoop rbfbalance -router submit /foo/src hdfs://namespace-1/foo/dst
It copies data from hdfs://namespace-0/foo/src to hdfs://namespace-1/foo/dst
incrementally and finally updates the mount entry to:
@ -59,7 +58,7 @@ Usage
If the hadoop shell process exits unexpectedly, we can use the command below
to continue the unfinished job:
bash$ /bin/hadoop fedbalance continue
bash$ /bin/hadoop rbfbalance continue
This will scan the journal to find all the unfinished jobs, recover and
continue to execute them.
@ -77,8 +76,8 @@ Usage
* the router-based federation mode (RBF mode).
* the normal federation mode.
By default the command runs in the normal federation mode. You can specify the
rbf mode by using the option `-router`.
The command `rbfbalance` runs in router-based federation mode. The command
`fedbalance` runs in normal federation mode.
In the rbf mode the first parameter is taken as the mount point. It disables
write by setting the mount point readonly.
@ -91,11 +90,10 @@ Usage
### Command Options
Command `submit` has 5 options:
Command `submit` has 4 options:
| Option key | Description | Default |
| ------------------------------ | ------------------------------------ | ------- |
| -router | Run in router-based federation mode. | Normal federation mode. |
| -forceCloseOpen | Force close all open files when there is no diff in the DIFF_DISTCP stage. | Wait until there is no open files. |
| -map | Max number of concurrent maps to use for copy. | 10 |
| -bandwidth | Specify bandwidth per map in MB. | 10 |
@ -106,7 +104,7 @@ Command `submit` has 5 options:
### Configuration Options
--------------------
Set configuration options at fedbalance-site.xml.
Set configuration options at hdfs-fedbalance-site.xml.
| Configuration key | Description | Default |
| ------------------------------ | ------------------------------------ | ------- |
@ -165,7 +163,7 @@ Architecture of HDFS Federation Balance
* MountTableProcedure: This procedure updates the mount entry in Router. The
readonly is unset and the destination is updated of the mount point. This
procedure is activated only when option `-router`.
procedure is activated only in router based federation mode.
* TrashProcedure: This procedure moves the source path to trash.

View File

@ -281,7 +281,7 @@ public void testStageFinish() throws Exception {
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
dcProcedure.disableWrite();
dcProcedure.disableWrite(context);
dcProcedure.finish();
// Verify path and permission.
@ -317,7 +317,8 @@ public void testRecoveryByStage() throws Exception {
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
executeProcedure(dcp[0], Stage.FINAL_DISTCP,
() -> dcp[0].disableWrite(context));
dcp[0] = serializeProcedure(dcp[0]);
OutputStream out = fs.append(new Path(src, "b/c"));
executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
@ -372,7 +373,7 @@ public void testDisableWrite() throws Exception {
new DistCpProcedure("distcp-procedure", null, 1000, context);
assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
() -> dcProcedure.disableWrite());
() -> dcProcedure.disableWrite(context));
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}