HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.

This commit is contained in:
Yiqun Lin 2020-06-18 13:33:25 +08:00
parent caf3995ac2
commit 9cbd76cc77
30 changed files with 2907 additions and 26 deletions

View File

@ -47,6 +47,14 @@
<outputDirectory>/libexec/shellprofile.d</outputDirectory> <outputDirectory>/libexec/shellprofile.d</outputDirectory>
<fileMode>0755</fileMode> <fileMode>0755</fileMode>
</fileSet> </fileSet>
<fileSet>
<directory>../hadoop-federation-balance/src/main/shellprofile.d</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>/libexec/shellprofile.d</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet> <fileSet>
<directory>../hadoop-extras/src/main/shellprofile.d</directory> <directory>../hadoop-extras/src/main/shellprofile.d</directory>
<includes> <includes>
@ -111,6 +119,13 @@
<include>*-sources.jar</include> <include>*-sources.jar</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet>
<directory>../hadoop-federation-balance/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
<includes>
<include>*-sources.jar</include>
</includes>
</fileSet>
<fileSet> <fileSet>
<directory>../hadoop-extras/target</directory> <directory>../hadoop-extras/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory> <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>

View File

@ -105,6 +105,8 @@ public final class HdfsConstants {
public static final String DOT_SNAPSHOT_DIR = ".snapshot"; public static final String DOT_SNAPSHOT_DIR = ".snapshot";
public static final String SEPARATOR_DOT_SNAPSHOT_DIR public static final String SEPARATOR_DOT_SNAPSHOT_DIR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR; = Path.SEPARATOR + DOT_SNAPSHOT_DIR;
public static final String DOT_SNAPSHOT_DIR_SEPARATOR =
DOT_SNAPSHOT_DIR + Path.SEPARATOR;
public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR; = Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
public final static String DOT_RESERVED_STRING = ".reserved"; public final static String DOT_RESERVED_STRING = ".reserved";

View File

@ -327,6 +327,12 @@
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId> <artifactId>hadoop-mapreduce-client-app</artifactId>
@ -578,6 +584,17 @@
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-datajoin</artifactId> <artifactId>hadoop-datajoin</artifactId>

View File

@ -0,0 +1,249 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.4.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-federation-balance</artifactId>
<version>3.4.0-SNAPSHOT</version>
<description>Apache Hadoop Federation Balance</description>
<name>Apache Hadoop Federation Balance</name>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<testFailureIgnore>${ignoreTestFailure}</testFailureIgnore>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<argLine>-Xmx1024m</argLine>
<includes>
<include>**/Test*.java</include>
</includes>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<systemProperties>
<property>
<name>test.build.data</name>
<value>${basedir}/target/test/data</value>
</property>
<property>
<name>hadoop.log.dir</name>
<value>target/test/logs</value>
</property>
<property>
<name>org.apache.commons.logging.Log</name>
<value>org.apache.commons.logging.impl.SimpleLog</value>
</property>
<property>
<name>org.apache.commons.logging.simplelog.defaultlog</name>
<value>warn</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
<execution>
<id>deplist</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
<!-- referenced by a built-in command -->
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.tools.fedbalance.FedBalance</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>prepare-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
<execution>
<id>prepare-test-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
/**
* Command line options of FedBalance.
*/
public final class DistCpBalanceOptions {
/**
* The private construct protects this class from being instantiated.
*/
private DistCpBalanceOptions() {}
/**
* Run in router-based federation mode.
*/
final static Option ROUTER = new Option("router", false,
"If `true` the command runs in router mode. The source path is "
+ "taken as a mount point. It will disable write by setting the mount"
+ " point readonly. Otherwise the command works in normal federation"
+ " mode. The source path is taken as the full path. It will disable"
+ " write by cancelling all permissions of the source path. The"
+ " default value is `true`.");
/**
* If true, in DIFF_DISTCP stage it will force close all open files when
* there is no diff between the source path and the dst path. Otherwise
* the DIFF_DISTCP stage will wait until there is no open files. The
* default value is `false`.
*/
final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false,
"Force close all open files if the src and dst are synced.");
/**
* Max number of maps to use during copy. DistCp will split work as equally
* as possible among these maps.
*/
final static Option MAP =
new Option("map", true, "Max number of concurrent maps to use for copy");
/**
* Specify bandwidth per map in MB, accepts bandwidth as a fraction.
*/
final static Option BANDWIDTH =
new Option("bandwidth", true, "Specify bandwidth per map in MB.");
/**
* Specify the delayed duration(millie seconds) to retry the Job.
*/
final static Option DELAY_DURATION = new Option("delay", true,
"This specifies the delayed duration(millie seconds) when the job"
+ " needs to retry. A job may retry many times and check the state"
+ " when it waits for the distcp job to finish.");
/**
* Move the source path to trash after all the data are sync to target, or
* delete the source directly, or skip both trash and deletion.
*/
final static Option TRASH = new Option("moveToTrash", true,
"Move the source path to trash, or delete the source path directly,"
+ " or skip both trash and deletion. This accepts 3 values: trash,"
+ " delete and skip. By default the server side trash interval is"
+ " used. If the trash is disabled in the server side, the default"
+ " trash interval 60 minutes is used.");
final static Options CLI_OPTIONS = new Options();
static {
CLI_OPTIONS.addOption(ROUTER);
CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN);
CLI_OPTIONS.addOption(MAP);
CLI_OPTIONS.addOption(BANDWIDTH);
CLI_OPTIONS.addOption(DELAY_DURATION);
CLI_OPTIONS.addOption(TRASH);
}
}

View File

@ -0,0 +1,635 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.OptionsParser;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
/**
* Copy data through distcp. Super user privilege needed.
*
* PRE_CHECK :pre-check of src and dst.
* INIT_DISTCP :the first round of distcp.
* DIFF_DISTCP :copy snapshot diff round by round until there is
* no diff.
* DISABLE_WRITE :disable write operations.
* FINAL_DISTCP :close all open files and do the final round distcp.
* FINISH :procedure finish.
*/
public class DistCpProcedure extends BalanceProcedure {
public static final Logger LOG =
LoggerFactory.getLogger(DistCpProcedure.class);
/* Stages of this procedure. */
enum Stage {
PRE_CHECK, INIT_DISTCP, DIFF_DISTCP, DISABLE_WRITE, FINAL_DISTCP, FINISH
}
private FedBalanceContext context; // the balance context.
private Path src; // the source path including the source cluster.
private Path dst; // the dst path including the dst cluster.
private Configuration conf;
private int mapNum; // the number of map tasks.
private int bandWidth; // the bandwidth limit of each distcp task.
private String jobId; // the id of the current distcp.
private Stage stage; // current stage of this procedure.
/* Force close all open files when there is no diff between src and dst */
private boolean forceCloseOpenFiles;
/* Disable write by setting the mount point readonly. */
private boolean useMountReadOnly;
private FsPermission fPerm; // the permission of the src.
private AclStatus acl; // the acl of the src.
private JobClient client;
private DistributedFileSystem srcFs; // fs of the src cluster.
private DistributedFileSystem dstFs; // fs of the dst cluster.
/**
* Test only. In unit test we use the LocalJobRunner to run the distcp jobs.
* Here we save the job to look up the job status. The localJob won't be
* serialized thus won't be recovered.
*/
@VisibleForTesting
private Job localJob;
/**
* Enable test mode. Use LocalJobRunner to run the distcp jobs.
*/
@VisibleForTesting
static boolean enabledForTest = false;
public DistCpProcedure() {
}
/**
* The constructor of DistCpProcedure.
*
* @param name the name of the procedure.
* @param nextProcedure the name of the next procedure.
* @param delayDuration the delay duration when this procedure is delayed.
* @param context the federation balance context.
*/
public DistCpProcedure(String name, String nextProcedure, long delayDuration,
FedBalanceContext context) throws IOException {
super(name, nextProcedure, delayDuration);
this.context = context;
this.src = context.getSrc();
this.dst = context.getDst();
this.conf = context.getConf();
this.client = new JobClient(conf);
this.stage = Stage.PRE_CHECK;
this.mapNum = context.getMapNum();
this.bandWidth = context.getBandwidthLimit();
this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
this.useMountReadOnly = context.getUseMountReadOnly();
srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
}
@Override
public boolean execute() throws RetryException, IOException {
LOG.info("Stage={}", stage.name());
switch (stage) {
case PRE_CHECK:
preCheck();
return false;
case INIT_DISTCP:
initDistCp();
return false;
case DIFF_DISTCP:
diffDistCp();
return false;
case DISABLE_WRITE:
disableWrite();
return false;
case FINAL_DISTCP:
finalDistCp();
return false;
case FINISH:
finish();
return true;
default:
throw new IOException("Unexpected stage=" + stage);
}
}
/**
* Pre check of src and dst.
*/
void preCheck() throws IOException {
FileStatus status = srcFs.getFileStatus(src);
if (!status.isDirectory()) {
throw new IOException(src + " should be a directory.");
}
if (dstFs.exists(dst)) {
throw new IOException(dst + " already exists.");
}
if (srcFs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR))) {
throw new IOException(src + " shouldn't enable snapshot.");
}
updateStage(Stage.INIT_DISTCP);
}
/**
* The initial distcp. Copying src to dst.
*/
void initDistCp() throws IOException, RetryException {
RunningJobStatus job = getCurrentJob();
if (job != null) {
// the distcp has been submitted.
if (job.isComplete()) {
jobId = null; // unset jobId because the job is done.
if (job.isSuccessful()) {
updateStage(Stage.DIFF_DISTCP);
return;
} else {
LOG.warn("DistCp failed. Failure={}", job.getFailureInfo());
}
} else {
throw new RetryException();
}
} else {
pathCheckBeforeInitDistcp();
srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
jobId = submitDistCpJob(
src.toString() + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
+ CURRENT_SNAPSHOT_NAME, dst.toString(), false);
}
}
/**
* The distcp copying diffs between LAST_SNAPSHOT_NAME and
* CURRENT_SNAPSHOT_NAME.
*/
void diffDistCp() throws IOException, RetryException {
RunningJobStatus job = getCurrentJob();
if (job != null) {
if (job.isComplete()) {
jobId = null;
if (job.isSuccessful()) {
LOG.info("DistCp succeeded. jobId={}", job.getJobID());
} else {
throw new IOException("DistCp failed. jobId=" + job.getJobID()
+ " failure=" + job.getFailureInfo());
}
} else {
throw new RetryException(); // wait job complete.
}
} else if (!verifyDiff()) {
if (!verifyOpenFiles() || forceCloseOpenFiles) {
updateStage(Stage.DISABLE_WRITE);
} else {
throw new RetryException();
}
} else {
submitDiffDistCp();
}
}
/**
* Disable write either by making the mount entry readonly or cancelling the
* execute permission of the source path.
*/
void disableWrite() throws IOException {
if (useMountReadOnly) {
String mount = context.getMount();
MountTableProcedure.disableWrite(mount, conf);
} else {
// Save and cancel permission.
FileStatus status = srcFs.getFileStatus(src);
fPerm = status.getPermission();
acl = srcFs.getAclStatus(src);
srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
}
updateStage(Stage.FINAL_DISTCP);
}
/**
* Enable write by restoring the x permission.
*/
void restorePermission() throws IOException {
// restore permission.
dstFs.removeAcl(dst);
if (acl != null) {
dstFs.modifyAclEntries(dst, acl.getEntries());
}
if (fPerm != null) {
dstFs.setPermission(dst, fPerm);
}
}
/**
* Close all open files then submit the distcp with -diff.
*/
void finalDistCp() throws IOException, RetryException {
// Close all open files then do the final distcp.
closeAllOpenFiles(srcFs, src);
// Final distcp.
RunningJobStatus job = getCurrentJob();
if (job != null) {
// the distcp has been submitted.
if (job.isComplete()) {
jobId = null; // unset jobId because the job is done.
if (job.isSuccessful()) {
updateStage(Stage.FINISH);
return;
} else {
throw new IOException(
"Final DistCp failed. Failure: " + job.getFailureInfo());
}
} else {
throw new RetryException();
}
} else {
submitDiffDistCp();
}
}
void finish() throws IOException {
if (!useMountReadOnly) {
restorePermission();
}
if (srcFs.exists(src)) {
cleanupSnapshot(srcFs, src);
}
if (dstFs.exists(dst)) {
cleanupSnapshot(dstFs, dst);
}
}
@VisibleForTesting
Stage getStage() {
return stage;
}
@VisibleForTesting
void updateStage(Stage value) {
String oldStage = stage == null ? "null" : stage.name();
String newStage = value == null ? "null" : value.name();
LOG.info("Stage updated from {} to {}.", oldStage, newStage);
stage = value;
}
/**
* Submit distcp with -diff option to do the incremental copy.
*
* | the source path | the dst path |
* | LAST_SNAPSHOT_NAME | LAST_SNAPSHOT_NAME |
* | CURRENT_SNAPSHOT_NAME |
*
* 1. Cleanup all the last snapshots. If there are no last snapshots then do
* nothing.
* 2. Create the dst path snapshot named the last snapshot.
* 3. Rename the source path current snapshot as the last snapshot. The dst
* path last snapshot and the source path last snapshot are the same now.
* 4. Create the current snapshot of the source path.
* 5. Submit the distcp job. The incremental part is from the source path last
* snapshot to the source path current snapshot.
*/
private void submitDiffDistCp() throws IOException {
enableSnapshot(dstFs, dst);
deleteSnapshot(srcFs, src, LAST_SNAPSHOT_NAME);
deleteSnapshot(dstFs, dst, LAST_SNAPSHOT_NAME);
dstFs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
srcFs.renameSnapshot(src, CURRENT_SNAPSHOT_NAME, LAST_SNAPSHOT_NAME);
srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
jobId = submitDistCpJob(src.toString(), dst.toString(), true);
}
/**
* Close all open files. Block until all the files are closed.
*/
private void closeAllOpenFiles(DistributedFileSystem dfs, Path path)
throws IOException {
String pathStr = path.toUri().getPath();
while (true) {
RemoteIterator<OpenFileEntry> iterator =
dfs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), pathStr);
if (!iterator.hasNext()) { // all files has been closed.
break;
}
while (iterator.hasNext()) {
OpenFileEntry e = iterator.next();
try {
srcFs.recoverLease(new Path(e.getFilePath()));
} catch (IOException re) {
// ignore recoverLease error.
}
}
}
}
/**
* Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot.
*
* @return true if the src has changed.
*/
private boolean verifyDiff() throws IOException {
SnapshotDiffReport diffReport =
srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, "");
return diffReport.getDiffList().size() > 0;
}
/**
* Verify whether there is any open files under src.
*
* @return true if there are open files.
*/
private boolean verifyOpenFiles() throws IOException {
RemoteIterator<OpenFileEntry> iterator = srcFs
.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
src.toString());
return iterator.hasNext();
}
private RunningJobStatus getCurrentJob() throws IOException {
if (jobId != null) {
if (enabledForTest) {
return getCurrentLocalJob();
} else {
RunningJob latestJob = client.getJob(JobID.forName(jobId));
return latestJob == null ? null : new YarnRunningJobStatus(latestJob);
}
}
return null;
}
private LocalJobStatus getCurrentLocalJob() throws IOException {
if (localJob != null) {
Job latestJob;
try {
latestJob = localJob.getCluster().getJob(JobID.forName(jobId));
} catch (InterruptedException e) {
throw new IOException(e);
}
return latestJob == null ? null : new LocalJobStatus(latestJob);
} else {
return null;
}
}
private void pathCheckBeforeInitDistcp() throws IOException {
if (dstFs.exists(dst)) { // clean up.
throw new IOException("The dst path=" + dst + " already exists. The admin"
+ " should delete it before submitting the initial distcp job.");
}
Path snapshotPath = new Path(src,
HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + CURRENT_SNAPSHOT_NAME);
if (srcFs.exists(snapshotPath)) {
throw new IOException("The src snapshot=" + snapshotPath +
" already exists. The admin should delete the snapshot before"
+ " submitting the initial distcp.");
}
srcFs.allowSnapshot(src);
}
/**
* Submit distcp job and return jobId.
*/
private String submitDistCpJob(String srcParam, String dstParam,
boolean useSnapshotDiff) throws IOException {
List<String> command = new ArrayList<>();
command.addAll(Arrays
.asList(new String[] {"-async", "-update", "-append", "-pruxgpcab"}));
if (useSnapshotDiff) {
command.add("-diff");
command.add(LAST_SNAPSHOT_NAME);
command.add(CURRENT_SNAPSHOT_NAME);
}
command.add("-m");
command.add(mapNum + "");
command.add("-bandwidth");
command.add(bandWidth + "");
command.add(srcParam);
command.add(dstParam);
Configuration config = new Configuration(conf);
DistCp distCp;
try {
distCp = new DistCp(config,
OptionsParser.parse(command.toArray(new String[]{})));
Job job = distCp.createAndSubmitJob();
LOG.info("Submit distcp job={}", job);
if (enabledForTest) {
localJob = job;
}
return job.getJobID().toString();
} catch (Exception e) {
throw new IOException("Submit job failed.", e);
}
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
context.write(out);
if (jobId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, jobId);
}
out.writeInt(stage.ordinal());
if (fPerm == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeShort(fPerm.toShort());
}
if (acl == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
PBHelperClient.convert(acl).writeDelimitedTo(bout);
byte[] data = bout.toByteArray();
out.writeInt(data.length);
out.write(data);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
context = new FedBalanceContext();
context.readFields(in);
src = context.getSrc();
dst = context.getDst();
conf = context.getConf();
if (in.readBoolean()) {
jobId = Text.readString(in);
}
stage = Stage.values()[in.readInt()];
if (in.readBoolean()) {
fPerm = FsPermission.read(in);
}
if (in.readBoolean()) {
int len = in.readInt();
byte[] data = new byte[len];
in.readFully(data);
ByteArrayInputStream bin = new ByteArrayInputStream(data);
AclProtos.GetAclStatusResponseProto proto =
AclProtos.GetAclStatusResponseProto.parseDelimitedFrom(bin);
acl = PBHelperClient.convert(proto);
}
srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
mapNum = context.getMapNum();
bandWidth = context.getBandwidthLimit();
forceCloseOpenFiles = context.getForceCloseOpenFiles();
useMountReadOnly = context.getUseMountReadOnly();
this.client = new JobClient(conf);
}
private static void enableSnapshot(DistributedFileSystem dfs, Path path)
throws IOException {
if (!dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
dfs.allowSnapshot(path);
}
}
static void deleteSnapshot(DistributedFileSystem dfs, Path path,
String snapshotName) throws IOException {
Path snapshot =
new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + snapshotName);
if (dfs.exists(snapshot)) {
dfs.deleteSnapshot(path, snapshotName);
}
}
static void cleanupSnapshot(DistributedFileSystem dfs, Path path)
throws IOException {
if (dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
FileStatus[] status =
dfs.listStatus(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR));
for (FileStatus s : status) {
deleteSnapshot(dfs, path, s.getPath().getName());
}
dfs.disallowSnapshot(path);
}
}
interface RunningJobStatus {
String getJobID();
boolean isComplete() throws IOException;
boolean isSuccessful() throws IOException;
String getFailureInfo() throws IOException;
}
private static class YarnRunningJobStatus implements RunningJobStatus {
private final RunningJob job;
YarnRunningJobStatus(RunningJob job) {
this.job = job;
}
@Override
public String getJobID() {
return job.getID().toString();
}
@Override
public boolean isComplete() throws IOException {
return job.isComplete();
}
@Override
public boolean isSuccessful() throws IOException {
return job.isSuccessful();
}
@Override
public String getFailureInfo() throws IOException {
return job.getFailureInfo();
}
}
private static class LocalJobStatus implements RunningJobStatus {
private final Job testJob;
LocalJobStatus(Job testJob) {
this.testJob = testJob;
}
@Override
public String getJobID() {
return testJob.getJobID().toString();
}
@Override
public boolean isComplete() throws IOException {
return testJob.isComplete();
}
@Override
public boolean isSuccessful() throws IOException {
return testJob.isSuccessful();
}
@Override
public String getFailureInfo() throws IOException {
try {
return testJob.getStatus().getFailureInfo();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
}

View File

@ -0,0 +1,377 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.ROUTER;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.FORCE_CLOSE_OPEN;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.MAP;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.BANDWIDTH;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.TRASH;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.DELAY_DURATION;
import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.CLI_OPTIONS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.FEDERATION_BALANCE_CLASS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
/**
* Balance data from src cluster to dst cluster with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
* 2. Update the the mount entry.
* 3. Delete the source path to trash.
*/
public class FedBalance extends Configured implements Tool {
public static final Logger LOG =
LoggerFactory.getLogger(FedBalance.class);
private static final String SUBMIT_COMMAND = "submit";
private static final String CONTINUE_COMMAND = "continue";
private static final String NO_MOUNT = "no-mount";
private static final String DISTCP_PROCEDURE = "distcp-procedure";
private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
private static final String TRASH_PROCEDURE = "trash-procedure";
/**
* This class helps building the balance job.
*/
private class Builder {
/* Balancing in an rbf cluster. */
private boolean routerCluster = false;
/* Force close all open files while there is no diff. */
private boolean forceCloseOpen = false;
/* Max number of concurrent maps to use for copy. */
private int map = 10;
/* Specify bandwidth per map in MB. */
private int bandwidth = 10;
/* Specify the trash behaviour of the source path. */
private TrashOption trashOpt = TrashOption.TRASH;
/* Specify the duration(millie seconds) when the procedure needs retry. */
private long delayDuration = TimeUnit.SECONDS.toMillis(1);
/* The source input. This specifies the source path. */
private final String inputSrc;
/* The dst input. This specifies the dst path. */
private final String inputDst;
Builder(String inputSrc, String inputDst) {
this.inputSrc = inputSrc;
this.inputDst = inputDst;
}
/**
* Whether balancing in an rbf cluster.
* @param value true if it's running in a router-based federation cluster.
*/
public Builder setRouterCluster(boolean value) {
this.routerCluster = value;
return this;
}
/**
* Whether force close all open files while there is no diff.
* @param value true if force close all the open files.
*/
public Builder setForceCloseOpen(boolean value) {
this.forceCloseOpen = value;
return this;
}
/**
* Max number of concurrent maps to use for copy.
* @param value the map number of the distcp.
*/
public Builder setMap(int value) {
this.map = value;
return this;
}
/**
* Specify bandwidth per map in MB.
* @param value the bandwidth.
*/
public Builder setBandWidth(int value) {
this.bandwidth = value;
return this;
}
/**
* Specify the trash behaviour of the source path.
* @param value the trash option.
*/
public Builder setTrashOpt(TrashOption value) {
this.trashOpt = value;
return this;
}
/**
* Specify the duration(millie seconds) when the procedure needs retry.
* @param value the delay duration of the job.
*/
public Builder setDelayDuration(long value) {
this.delayDuration = value;
return this;
}
/**
* Build the balance job.
*/
public BalanceJob build() throws IOException {
// Construct job context.
FedBalanceContext context;
Path dst = new Path(inputDst);
if (dst.toUri().getAuthority() == null) {
throw new IOException("The destination cluster must be specified.");
}
if (routerCluster) { // router-based federation.
Path src = getSrcPath(inputSrc);
String mount = inputSrc;
context = new FedBalanceContext.Builder(src, dst, mount, getConf())
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDelayDuration(delayDuration).build();
} else { // normal federation cluster.
Path src = new Path(inputSrc);
if (src.toUri().getAuthority() == null) {
throw new IOException("The source cluster must be specified.");
}
context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt).build();
}
LOG.info(context.toString());
// Construct the balance job.
BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
DistCpProcedure dcp =
new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(dcp);
if (routerCluster) {
MountTableProcedure mtp =
new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
getConf());
builder.nextProcedure(mtp);
}
TrashProcedure tp =
new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(tp);
return builder.build();
}
}
public FedBalance() {
super();
}
@Override
public int run(String[] args) throws Exception {
CommandLineParser parser = new GnuParser();
CommandLine command =
parser.parse(DistCpBalanceOptions.CLI_OPTIONS, args, true);
String[] leftOverArgs = command.getArgs();
if (leftOverArgs == null || leftOverArgs.length < 1) {
printUsage();
return -1;
}
String cmd = leftOverArgs[0];
if (cmd.equals(SUBMIT_COMMAND)) {
if (leftOverArgs.length < 3) {
printUsage();
return -1;
}
String inputSrc = leftOverArgs[1];
String inputDst = leftOverArgs[2];
return submit(command, inputSrc, inputDst);
} else if (cmd.equals(CONTINUE_COMMAND)) {
return continueJob();
} else {
printUsage();
return -1;
}
}
/**
* Recover and continue the unfinished jobs.
*/
private int continueJob() throws InterruptedException {
BalanceProcedureScheduler scheduler =
new BalanceProcedureScheduler(getConf());
try {
scheduler.init(true);
while (true) {
Collection<BalanceJob> jobs = scheduler.getAllJobs();
int unfinished = 0;
for (BalanceJob job : jobs) {
if (!job.isJobDone()) {
unfinished++;
}
LOG.info(job.toString());
}
if (unfinished == 0) {
break;
}
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
}
} catch (IOException e) {
LOG.error("Continue balance job failed.", e);
return -1;
} finally {
scheduler.shutDown();
}
return 0;
}
/**
* Start a ProcedureScheduler and submit the job.
*
* @param command the command options.
* @param inputSrc the source input. This specifies the source path.
* @param inputDst the dst input. This specifies the dst path.
*/
private int submit(CommandLine command, String inputSrc, String inputDst)
throws IOException {
Builder builder = new Builder(inputSrc, inputDst);
// parse options.
builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
if (command.hasOption(MAP.getOpt())) {
builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
}
if (command.hasOption(BANDWIDTH.getOpt())) {
builder.setBandWidth(
Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt())));
}
if (command.hasOption(DELAY_DURATION.getOpt())) {
builder.setDelayDuration(
Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
}
if (command.hasOption(TRASH.getOpt())) {
String val = command.getOptionValue(TRASH.getOpt());
if (val.equalsIgnoreCase("skip")) {
builder.setTrashOpt(TrashOption.SKIP);
} else if (val.equalsIgnoreCase("trash")) {
builder.setTrashOpt(TrashOption.TRASH);
} else if (val.equalsIgnoreCase("delete")) {
builder.setTrashOpt(TrashOption.DELETE);
} else {
printUsage();
return -1;
}
}
// Submit the job.
BalanceProcedureScheduler scheduler =
new BalanceProcedureScheduler(getConf());
scheduler.init(false);
try {
BalanceJob balanceJob = builder.build();
// Submit and wait until the job is done.
scheduler.submit(balanceJob);
scheduler.waitUntilDone(balanceJob);
} catch (IOException e) {
LOG.error("Submit balance job failed.", e);
return -1;
} finally {
scheduler.shutDown();
}
return 0;
}
/**
* Get src uri from Router.
*/
private Path getSrcPath(String fedPath) throws IOException {
String address = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
RouterClient rClient = new RouterClient(routerSocket, getConf());
try {
MountTableManager mountTable = rClient.getMountTableManager();
MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
if (entry == null) {
throw new IllegalArgumentException(
"The mount point doesn't exist. path=" + fedPath);
} else if (entry.getDestinations().size() > 1) {
throw new IllegalArgumentException(
"The mount point has more than one destination. path=" + fedPath);
} else {
String ns = entry.getDestinations().get(0).getNameserviceId();
String path = entry.getDestinations().get(0).getDest();
return new Path("hdfs://" + ns + path);
}
} finally {
rClient.close();
}
}
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(
"fedbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
CLI_OPTIONS);
}
/**
* Main function of the FedBalance program. Parses the input arguments and
* invokes the FedBalance::run() method, via the ToolRunner.
* @param argv Command-line arguments sent to FedBalance.
*/
public static void main(String[] argv) {
Configuration conf = new HdfsConfiguration();
Class<Tool> balanceClazz = (Class<Tool>) conf
.getClass(FEDERATION_BALANCE_CLASS, FedBalance.class);
Tool balancer = ReflectionUtils.newInstance(balanceClazz, conf);
int exitCode;
try {
exitCode = ToolRunner.run(balancer, argv);
} catch (Exception e) {
LOG.warn("Couldn't complete FedBalance operation.", e);
exitCode = -1;
}
System.exit(exitCode);
}
}

View File

@ -15,16 +15,25 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
/** /**
* This class contains constants for configuration keys and default values * Federation balance configuration properties.
* used in hdfs procedure.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public final class BalanceProcedureConfigKeys { public final class FedBalanceConfigs {
/* The class used for federation balance */
public static final String FEDERATION_BALANCE_CLASS =
"federation.balance.class";
public static final String LAST_SNAPSHOT_NAME = "DISTCP-BALANCE-CURRENT";
public static final String CURRENT_SNAPSHOT_NAME = "DISTCP-BALANCE-NEXT";
/* Specify the behaviour of trash. */
public enum TrashOption {
TRASH, DELETE, SKIP
}
/* The worker threads number of the BalanceProcedureScheduler */ /* The worker threads number of the BalanceProcedureScheduler */
public static final String WORK_THREAD_NUM = public static final String WORK_THREAD_NUM =
"hadoop.hdfs.procedure.work.thread.num"; "hadoop.hdfs.procedure.work.thread.num";
@ -37,5 +46,5 @@ public final class BalanceProcedureConfigKeys {
public static final String JOURNAL_CLASS = public static final String JOURNAL_CLASS =
"hadoop.hdfs.procedure.journal.class"; "hadoop.hdfs.procedure.journal.class";
private BalanceProcedureConfigKeys() {} private FedBalanceConfigs(){}
} }

View File

@ -0,0 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
/**
* This class contains the basic information needed when Federation Balance.
*/
public class FedBalanceContext implements Writable {
/* the source path in the source sub-cluster */
private Path src;
/* the target path in the target sub-cluster */
private Path dst;
/* the mount point to be balanced */
private String mount;
/* Force close all open files when there is no diff between src and dst */
private boolean forceCloseOpenFiles;
/* Disable write by setting the mount point readonly. */
private boolean useMountReadOnly;
/* The map number of the distcp job. */
private int mapNum;
/* The bandwidth limit of the distcp job(MB). */
private int bandwidthLimit;
/* Move source path to trash after all the data are sync to target. Otherwise
delete the source directly. */
private TrashOption trashOpt;
/* How long will the procedures be delayed. */
private long delayDuration;
private Configuration conf;
public FedBalanceContext() {}
public Configuration getConf() {
return conf;
}
public Path getSrc() {
return src;
}
public Path getDst() {
return dst;
}
public String getMount() {
return mount;
}
public boolean getForceCloseOpenFiles() {
return forceCloseOpenFiles;
}
public boolean getUseMountReadOnly() {
return useMountReadOnly;
}
public int getMapNum() {
return mapNum;
}
public int getBandwidthLimit() {
return bandwidthLimit;
}
public TrashOption getTrashOpt() {
return trashOpt;
}
@Override
public void write(DataOutput out) throws IOException {
conf.write(out);
Text.writeString(out, src.toString());
Text.writeString(out, dst.toString());
Text.writeString(out, mount);
out.writeBoolean(forceCloseOpenFiles);
out.writeBoolean(useMountReadOnly);
out.writeInt(mapNum);
out.writeInt(bandwidthLimit);
out.writeInt(trashOpt.ordinal());
out.writeLong(delayDuration);
}
@Override
public void readFields(DataInput in) throws IOException {
conf = new Configuration(false);
conf.readFields(in);
src = new Path(Text.readString(in));
dst = new Path(Text.readString(in));
mount = Text.readString(in);
forceCloseOpenFiles = in.readBoolean();
useMountReadOnly = in.readBoolean();
mapNum = in.readInt();
bandwidthLimit = in.readInt();
trashOpt = TrashOption.values()[in.readInt()];
delayDuration = in.readLong();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
FedBalanceContext bc = (FedBalanceContext) obj;
return new EqualsBuilder()
.append(src, bc.src)
.append(dst, bc.dst)
.append(mount, bc.mount)
.append(forceCloseOpenFiles, bc.forceCloseOpenFiles)
.append(useMountReadOnly, bc.useMountReadOnly)
.append(mapNum, bc.mapNum)
.append(bandwidthLimit, bc.bandwidthLimit)
.append(trashOpt, bc.trashOpt)
.append(delayDuration, bc.delayDuration)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(src)
.append(dst)
.append(mount)
.append(forceCloseOpenFiles)
.append(useMountReadOnly)
.append(mapNum)
.append(bandwidthLimit)
.append(trashOpt)
.append(delayDuration)
.build();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("FedBalance context:");
builder.append(" src=").append(src);
builder.append(", dst=").append(dst);
if (useMountReadOnly) {
builder.append(", router-mode=true");
builder.append(", mount-point=").append(mount);
} else {
builder.append(", router-mode=false");
}
builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles);
builder.append(", trash=").append(trashOpt.name());
builder.append(", map=").append(mapNum);
builder.append(", bandwidth=").append(bandwidthLimit);
builder.append(", delayDuration=").append(delayDuration);
return builder.toString();
}
static class Builder {
private final Path src;
private final Path dst;
private final String mount;
private final Configuration conf;
private boolean forceCloseOpenFiles = false;
private boolean useMountReadOnly = false;
private int mapNum;
private int bandwidthLimit;
private TrashOption trashOpt;
private long delayDuration;
/**
* This class helps building the FedBalanceContext.
*
* @param src the source path in the source sub-cluster.
* @param dst the target path in the target sub-cluster.
* @param mount the mount point to be balanced.
* @param conf the configuration.
*/
Builder(Path src, Path dst, String mount, Configuration conf) {
this.src = src;
this.dst = dst;
this.mount = mount;
this.conf = conf;
}
/**
* Force close open files.
* @param value true if force close all the open files.
*/
public Builder setForceCloseOpenFiles(boolean value) {
this.forceCloseOpenFiles = value;
return this;
}
/**
* Use mount point readonly to disable write.
* @param value true if disabling write by setting mount point readonly.
*/
public Builder setUseMountReadOnly(boolean value) {
this.useMountReadOnly = value;
return this;
}
/**
* The map number of the distcp job.
* @param value the map number of the distcp.
*/
public Builder setMapNum(int value) {
this.mapNum = value;
return this;
}
/**
* The bandwidth limit of the distcp job(MB).
* @param value the bandwidth.
*/
public Builder setBandwidthLimit(int value) {
this.bandwidthLimit = value;
return this;
}
/**
* Specify the trash behaviour after all the data is sync to the target.
* @param value the trash option.
* */
public Builder setTrash(TrashOption value) {
this.trashOpt = value;
return this;
}
/**
* Specify the delayed duration when the procedures need to retry.
*/
public Builder setDelayDuration(long value) {
this.delayDuration = value;
return this;
}
/**
* Build the FedBalanceContext.
*
* @return the FedBalanceContext obj.
*/
public FedBalanceContext build() {
FedBalanceContext context = new FedBalanceContext();
context.src = this.src;
context.dst = this.dst;
context.mount = this.mount;
context.conf = this.conf;
context.forceCloseOpenFiles = this.forceCloseOpenFiles;
context.useMountReadOnly = this.useMountReadOnly;
context.mapNum = this.mapNum;
context.bandwidthLimit = this.bandwidthLimit;
context.trashOpt = this.trashOpt;
context.delayDuration = this.delayDuration;
return context;
}
}
}

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
/**
* Update mount table.
* Old mount table:
* /a/b/c -> {ns:src path:/a/b/c}
* New mount table:
* /a/b/c -> {ns:dst path:/a/b/c}
*/
public class MountTableProcedure extends BalanceProcedure {
private String mount;
private String dstPath;
private String dstNs;
private Configuration conf;
public MountTableProcedure() {}
/**
* Update mount entry to specified dst uri.
*
* @param mount the mount entry to be updated.
* @param dstPath the sub-cluster uri of the dst path.
* @param conf the configuration.
*/
public MountTableProcedure(String name, String nextProcedure,
long delayDuration, String mount, String dstPath, String dstNs,
Configuration conf) throws IOException {
super(name, nextProcedure, delayDuration);
this.mount = mount;
this.dstPath = dstPath;
this.dstNs = dstNs;
this.conf = conf;
}
@Override
public boolean execute() throws RetryException, IOException {
updateMountTable();
return true;
}
private void updateMountTable() throws IOException {
updateMountTableDestination(mount, dstNs, dstPath, conf);
enableWrite(mount, conf);
}
/**
* Update the destination of the mount point to target namespace and target
* path.
*
* @param mount the mount point.
* @param dstNs the target namespace.
* @param dstPath the target path
* @param conf the configuration of the router.
*/
private static void updateMountTableDestination(String mount, String dstNs,
String dstPath, Configuration conf) throws IOException {
String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
RouterClient rClient = new RouterClient(routerSocket, conf);
try {
MountTableManager mountTable = rClient.getMountTableManager();
MountTable originalEntry = getMountEntry(mount, mountTable);
if (originalEntry == null) {
throw new IOException("Mount table " + mount + " doesn't exist");
} else {
RemoteLocation remoteLocation =
new RemoteLocation(dstNs, dstPath, mount);
originalEntry.setDestinations(Arrays.asList(remoteLocation));
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(originalEntry);
UpdateMountTableEntryResponse response =
mountTable.updateMountTableEntry(updateRequest);
if (!response.getStatus()) {
throw new IOException("Failed update mount table " + mount);
}
rClient.getMountTableManager().refreshMountTableEntries(
RefreshMountTableEntriesRequest.newInstance());
}
} finally {
rClient.close();
}
}
/**
* Gets the mount table entry.
* @param mount name of the mount entry.
* @param mountTable the mount table.
* @return corresponding mount entry.
* @throws IOException in case of failure to retrieve mount entry.
*/
public static MountTable getMountEntry(String mount,
MountTableManager mountTable)
throws IOException {
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mount);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> results = getResponse.getEntries();
MountTable existingEntry = null;
for (MountTable result : results) {
if (mount.equals(result.getSourcePath())) {
existingEntry = result;
break;
}
}
return existingEntry;
}
/**
* Disable write by making the mount point readonly.
*
* @param mount the mount point to set readonly.
* @param conf the configuration of the router.
*/
static void disableWrite(String mount, Configuration conf)
throws IOException {
setMountReadOnly(mount, true, conf);
}
/**
* Enable write by cancelling the mount point readonly.
*
* @param mount the mount point to cancel readonly.
* @param conf the configuration of the router.
*/
static void enableWrite(String mount, Configuration conf) throws IOException {
setMountReadOnly(mount, false, conf);
}
/**
* Enable or disable readonly of the mount point.
*
* @param mount the mount point.
* @param readOnly enable or disable readonly.
* @param conf the configuration of the router.
*/
private static void setMountReadOnly(String mount, boolean readOnly,
Configuration conf) throws IOException {
String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
RouterClient rClient = new RouterClient(routerSocket, conf);
try {
MountTableManager mountTable = rClient.getMountTableManager();
MountTable originalEntry = getMountEntry(mount, mountTable);
if (originalEntry == null) {
throw new IOException("Mount table " + mount + " doesn't exist");
} else {
originalEntry.setReadOnly(readOnly);
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(originalEntry);
UpdateMountTableEntryResponse response =
mountTable.updateMountTableEntry(updateRequest);
if (!response.getStatus()) {
throw new IOException(
"Failed update mount table " + mount + " with readonly="
+ readOnly);
}
rClient.getMountTableManager().refreshMountTableEntries(
RefreshMountTableEntriesRequest.newInstance());
}
} finally {
rClient.close();
}
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, mount);
Text.writeString(out, dstPath);
Text.writeString(out, dstNs);
conf.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
mount = Text.readString(in);
dstPath = Text.readString(in);
dstNs = Text.readString(in);
conf = new Configuration(false);
conf.readFields(in);
}
@VisibleForTesting
String getMount() {
return mount;
}
@VisibleForTesting
String getDstPath() {
return dstPath;
}
@VisibleForTesting
String getDstNs() {
return dstNs;
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
/**
* This procedure moves the source path to the corresponding trash.
*/
public class TrashProcedure extends BalanceProcedure {
private DistributedFileSystem srcFs;
private FedBalanceContext context;
private Configuration conf;
public TrashProcedure() {}
/**
* The constructor of TrashProcedure.
*
* @param name the name of the procedure.
* @param nextProcedure the name of the next procedure.
* @param delayDuration the delay duration when this procedure is delayed.
* @param context the federation balance context.
*/
public TrashProcedure(String name, String nextProcedure, long delayDuration,
FedBalanceContext context) throws IOException {
super(name, nextProcedure, delayDuration);
this.context = context;
this.conf = context.getConf();
this.srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
}
@Override
public boolean execute() throws IOException {
moveToTrash();
return true;
}
/**
* Delete source path to trash.
*/
void moveToTrash() throws IOException {
Path src = context.getSrc();
if (srcFs.exists(src)) {
TrashOption trashOption = context.getTrashOpt();
switch (trashOption) {
case TRASH:
conf.setFloat(FS_TRASH_INTERVAL_KEY, 60);
if (!Trash.moveToAppropriateTrash(srcFs, src, conf)) {
throw new IOException("Failed move " + src + " to trash.");
}
break;
case DELETE:
if (!srcFs.delete(src, true)) {
throw new IOException("Failed delete " + src);
}
LOG.info("{} is deleted.", src);
break;
case SKIP:
break;
default:
throw new IOException("Unexpected trash option=" + trashOption);
}
}
}
public FedBalanceContext getContext() {
return context;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
context.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
context = new FedBalanceContext();
context.readFields(in);
conf = context.getConf();
srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* FedBalance is a tool for balancing data across federation clusters.
*/
@InterfaceAudience.Public
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -37,9 +37,9 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TMP_TAIL;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOB_PREFIX;
/** /**
* BalanceJournal based on HDFS. This class stores all the journals in the HDFS. * BalanceJournal based on HDFS. This class stores all the journals in the HDFS.

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
@ -29,7 +29,7 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE; import static org.apache.hadoop.tools.fedbalance.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
/** /**
* The basic components of the Job. Extend this class to implement different * The basic components of the Job. Extend this class to implement different

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -40,9 +40,9 @@ import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOURNAL_CLASS;
/** /**
* The state machine framework consist of: * The state machine framework consist of:
* Job: The state machine. It implements the basic logic of the * Job: The state machine. It implements the basic logic of the

View File

@ -23,7 +23,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if ! declare -f hadoop_subcommand_fedbalance >/dev/null 2>/dev/null; then
if [[ "${HADOOP_SHELL_EXECNAME}" = hadoop ]]; then
hadoop_add_subcommand "fedbalance" client "balance data between sub-clusters"
fi
# this can't be indented otherwise shelldocs won't get it
## @description fedbalance command for hadoop
## @audience public
## @stability stable
## @replaceable yes
function hadoop_subcommand_fedbalance
{
# shellcheck disable=SC2034
HADOOP_CLASSNAME=org.apache.hadoop.tools.fedbalance.FedBalance
hadoop_add_to_classpath_tools hadoop-distcp
hadoop_add_to_classpath_tools hadoop-federation-balance
}
fi

View File

@ -0,0 +1,446 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure.RetryException;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.Random;
import static junit.framework.TestCase.assertTrue;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
* Test DistCpProcedure.
*/
public class TestDistCpProcedure {
private static MiniDFSCluster cluster;
private static Configuration conf;
static final String MOUNT = "mock_mount_point";
private static final String SRCDAT = "srcdat";
private static final String DSTDAT = "dstdat";
private static final long BLOCK_SIZE = 1024;
private static final long FILE_SIZE = BLOCK_SIZE * 100;
private FileEntry[] srcfiles =
{new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/a", false),
new FileEntry(SRCDAT + "/b", true),
new FileEntry(SRCDAT + "/b/c", false)};
private static String nnUri;
@BeforeClass
public static void beforeClass() throws IOException {
DistCpProcedure.enabledForTest = true;
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
String workPath =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
conf.set(SCHEDULER_JOURNAL_URI, workPath);
nnUri = FileSystem.getDefaultUri(conf).toString();
}
@AfterClass
public static void afterClass() {
DistCpProcedure.enabledForTest = false;
if (cluster != null) {
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testSuccessfulDistCpProcedure() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FsPermission originalPerm = new FsPermission(777);
fs.setPermission(src, originalPerm);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
scheduler.init(true);
BalanceJob balanceJob =
new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
scheduler.submit(balanceJob);
scheduler.waitUntilDone(balanceJob);
assertTrue(balanceJob.isJobDone());
if (balanceJob.getError() != null) {
throw balanceJob.getError();
}
assertNull(balanceJob.getError());
assertTrue(fs.exists(dst));
assertFalse(
fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(
fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
for (FileEntry e : srcfiles) { // verify file len.
if (!e.isDir) {
Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT));
assertEquals(FILE_SIZE, fs.getFileStatus(targetFile).getLen());
}
}
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testInitDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
// set permission.
fs.setPermission(src, FsPermission.createImmutable((short) 020));
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
// submit distcp.
try {
dcProcedure.initDistCp();
} catch (RetryException e) {
}
fs.delete(new Path(src, "a"), true);
// wait until job done.
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
assertTrue(fs.exists(dst));
// Because we used snapshot, the file should be copied.
assertTrue(fs.exists(new Path(dst, "a")));
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testDiffDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
assertTrue(fs.exists(dst));
// move file out of src and test distcp.
fs.rename(new Path(src, "a"), new Path("/a"));
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertFalse(fs.exists(new Path(dst, "a")));
// move back file src/a and test distcp.
fs.rename(new Path("/a"), new Path(src, "a"));
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertTrue(fs.exists(new Path(dst, "a")));
// append file src/a and test.
OutputStream out = fs.append(new Path(src, "a"));
out.write("hello".getBytes());
out.close();
long len = fs.getFileStatus(new Path(src, "a")).getLen();
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertEquals(len, fs.getFileStatus(new Path(dst, "a")).getLen());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testStageFinalDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
// open files.
OutputStream out = fs.append(new Path(src, "a"));
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
// Verify all the open files have been closed.
intercept(RemoteException.class, "LeaseExpiredException",
"Expect RemoteException(LeaseExpiredException).", () -> out.close());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testStageFinish() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
fs.mkdirs(src);
fs.mkdirs(dst);
fs.allowSnapshot(src);
fs.allowSnapshot(dst);
fs.createSnapshot(src, LAST_SNAPSHOT_NAME);
fs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
fs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
FsPermission originalPerm = new FsPermission(777);
fs.setPermission(src, originalPerm);
// Test the finish stage.
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
dcProcedure.disableWrite();
dcProcedure.finish();
// Verify path and permission.
assertTrue(fs.exists(dst));
assertFalse(fs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(fs.exists(new Path(dst, HdfsConstants.DOT_SNAPSHOT_DIR)));
assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testRecoveryByStage() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
final DistCpProcedure[] dcp = new DistCpProcedure[1];
dcp[0] = new DistCpProcedure("distcp-procedure", null, 1000, context);
// Doing serialization and deserialization before each stage to monitor the
// recovery.
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.INIT_DISTCP, () -> dcp[0].preCheck());
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DIFF_DISTCP, () -> dcp[0].initDistCp());
fs.delete(new Path(src, "a"), true); // make some difference.
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
dcp[0] = serializeProcedure(dcp[0]);
OutputStream out = fs.append(new Path(src, "b/c"));
executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
intercept(RemoteException.class, "LeaseExpiredException",
"Expect RemoteException(LeaseExpiredException).", () -> out.close());
dcp[0] = serializeProcedure(dcp[0]);
assertTrue(dcp[0].execute());
assertTrue(fs.exists(dst));
assertFalse(
fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(
fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testShutdown() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
scheduler.init(true);
BalanceJob balanceJob =
new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
scheduler.submit(balanceJob);
long sleep = Math.abs(new Random().nextLong()) % 10000;
Thread.sleep(sleep);
scheduler.shutDown();
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testDisableWrite() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
() -> dcProcedure.disableWrite());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}
private FedBalanceContext buildContext(Path src, Path dst, String mount) {
return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
.setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
.build();
}
interface Call {
void execute() throws IOException, RetryException;
}
/**
* Execute the procedure until its stage is updated to the target stage.
*
* @param procedure the procedure to be executed and verified.
* @param target the target stage.
* @param call the function executing the procedure.
*/
private static void executeProcedure(DistCpProcedure procedure, Stage target,
Call call) throws IOException {
Stage stage = Stage.PRE_CHECK;
procedure.updateStage(stage);
while (stage != target) {
try {
call.execute();
} catch (RetryException e) {
} finally {
stage = procedure.getStage();
}
}
}
static class FileEntry {
private String path;
private boolean isDir;
FileEntry(String path, boolean isDir) {
this.path = path;
this.isDir = isDir;
}
String getPath() {
return path;
}
boolean isDirectory() {
return isDir;
}
}
/**
* Create directories and files with random data.
*
* @param fs the file system obj.
* @param topdir the base dir of the directories and files.
* @param entries the directory and file entries to be created.
*/
private void createFiles(DistributedFileSystem fs, String topdir,
FileEntry[] entries) throws IOException {
long seed = System.currentTimeMillis();
Random rand = new Random(seed);
short replicationFactor = 2;
for (FileEntry entry : entries) {
Path newPath = new Path(topdir + "/" + entry.getPath());
if (entry.isDirectory()) {
fs.mkdirs(newPath);
} else {
int bufSize = 128;
DFSTestUtil.createFile(fs, newPath, bufSize, FILE_SIZE, BLOCK_SIZE,
replicationFactor, seed);
}
seed = System.currentTimeMillis() + rand.nextLong();
}
}
private DistCpProcedure serializeProcedure(DistCpProcedure dcp)
throws IOException {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutput dataOut = new DataOutputStream(bao);
dcp.write(dataOut);
dcp = new DistCpProcedure();
dcp.readFields(
new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
return dcp;
}
private void cleanup(DistributedFileSystem dfs, Path root)
throws IOException {
Path src = new Path(root, SRCDAT);
Path dst = new Path(root, DSTDAT);
DistCpProcedure.cleanupSnapshot(dfs, src);
DistCpProcedure.cleanupSnapshot(dfs, dst);
dfs.delete(root, true);
}
}

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Basic tests of MountTableProcedure.
*/
public class TestMountTableProcedure {
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static Configuration routerConf;
private static List<MountTable> mockMountTable;
private static StateStoreService stateStore;
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new StateStoreDFSCluster(false, 1);
// Build and start a router with State Store + admin + RPC
Configuration conf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(conf);
cluster.startRouters();
routerContext = cluster.getRandomRouter();
mockMountTable = cluster.generateMockMountTable();
Router router = routerContext.getRouter();
stateStore = router.getStateStore();
// Add two name services for testing
ActiveNamenodeResolver membership = router.getNamenodeResolver();
membership.registerNamenode(createNamenodeReport("ns0", "nn1",
HAServiceProtocol.HAServiceState.ACTIVE));
membership.registerNamenode(createNamenodeReport("ns1", "nn1",
HAServiceProtocol.HAServiceState.ACTIVE));
stateStore.refreshCaches(true);
routerConf = new Configuration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
routerSocket);
}
@AfterClass
public static void tearDown() {
cluster.stopRouter(routerContext);
}
@Before
public void testSetup() throws Exception {
assertTrue(
synchronizeRecords(stateStore, mockMountTable, MountTable.class));
// Avoid running with random users
routerContext.resetAdminClient();
}
@Test
public void testUpdateMountpoint() throws Exception {
// Firstly add mount entry: /test-path->{ns0,/test-path}.
String mount = "/test-path";
String dst = "/test-dst";
MountTable newEntry = MountTable
.newInstance(mount, Collections.singletonMap("ns0", mount),
Time.now(), Time.now());
MountTableManager mountTable =
routerContext.getAdminClient().getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
// verify the mount entry is added successfully.
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
assertEquals(3, response.getEntries().size());
// set the mount table to readonly.
MountTableProcedure.disableWrite(mount, routerConf);
// test MountTableProcedure updates the mount point.
String dstNs = "ns1";
MountTableProcedure smtp =
new MountTableProcedure("single-mount-table-procedure", null,
1000, mount, dst, dstNs, routerConf);
assertTrue(smtp.execute());
stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
// verify the mount entry is updated to /
MountTable entry =
MountTableProcedure.getMountEntry(mount, mountTable);
assertNotNull(entry);
assertEquals(1, entry.getDestinations().size());
String nsId = entry.getDestinations().get(0).getNameserviceId();
String dstPath = entry.getDestinations().get(0).getDest();
assertEquals(dstNs, nsId);
assertEquals(dst, dstPath);
// Verify the mount table is not readonly.
URI address = routerContext.getFileSystemURI();
DFSClient routerClient = new DFSClient(address, routerConf);
MountTableProcedure.enableWrite(mount, routerConf);
intercept(RemoteException.class, "No namenode available to invoke mkdirs",
"Expect no namenode exception.", () -> routerClient
.mkdirs(mount + "/file", new FsPermission(020), false));
}
@Test
public void testDisableAndEnableWrite() throws Exception {
// Firstly add mount entry: /test-write->{ns0,/test-write}.
String mount = "/test-write";
MountTable newEntry = MountTable
.newInstance(mount, Collections.singletonMap("ns0", mount),
Time.now(), Time.now());
MountTableManager mountTable =
routerContext.getAdminClient().getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
// Construct client.
URI address = routerContext.getFileSystemURI();
DFSClient routerClient = new DFSClient(address, routerConf);
// Verify the mount point is not readonly.
intercept(RemoteException.class, "No namenode available to invoke mkdirs",
"Expect no namenode exception.", () -> routerClient
.mkdirs(mount + "/file", new FsPermission(020), false));
// Verify disable write.
MountTableProcedure.disableWrite(mount, routerConf);
intercept(RemoteException.class, "is in a read only mount point",
"Expect readonly exception.", () -> routerClient
.mkdirs(mount + "/dir", new FsPermission(020), false));
// Verify enable write.
MountTableProcedure.enableWrite(mount, routerConf);
intercept(RemoteException.class, "No namenode available to invoke mkdirs",
"Expect no namenode exception.", () -> routerClient
.mkdirs(mount + "/file", new FsPermission(020), false));
}
@Test
public void testSeDeserialize() throws Exception {
String fedPath = "/test-path";
String dst = "/test-dst";
String dstNs = "ns1";
MountTableProcedure smtp =
new MountTableProcedure("single-mount-table-procedure", null,
1000, fedPath, dst, dstNs, routerConf);
ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutput dataOut = new DataOutputStream(bao);
smtp.write(dataOut);
smtp = new MountTableProcedure();
smtp.readFields(
new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
assertEquals(fedPath, smtp.getMount());
assertEquals(dst, smtp.getDstPath());
assertEquals(dstNs, smtp.getDstNs());
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
/**
* Test TrashProcedure.
*/
public class TestTrashProcedure {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static String nnUri;
@BeforeClass
public static void beforeClass() throws IOException {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
nnUri = FileSystem.getDefaultUri(conf).toString();
}
@AfterClass
public static void afterClass() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testTrashProcedure() throws Exception {
Path src = new Path("/" + getMethodName() + "-src");
Path dst = new Path("/" + getMethodName() + "-dst");
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(src);
fs.mkdirs(new Path(src, "dir"));
assertTrue(fs.exists(src));
FedBalanceContext context =
new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
.setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
.build();
TrashProcedure trashProcedure =
new TrashProcedure("trash-procedure", null, 1000, context);
trashProcedure.moveToTrash();
assertFalse(fs.exists(src));
}
@Test
public void testSeDeserialize() throws Exception {
Path src = new Path("/" + getMethodName() + "-src");
Path dst = new Path("/" + getMethodName() + "-dst");
FedBalanceContext context =
new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
.setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
.build();
TrashProcedure trashProcedure =
new TrashProcedure("trash-procedure", null, 1000, context);
ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutput dataOut = new DataOutputStream(bao);
trashProcedure.write(dataOut);
trashProcedure = new TrashProcedure();
trashProcedure.readFields(
new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
assertEquals(context, trashProcedure.getContext());
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -43,8 +43,8 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -70,6 +70,7 @@ public class TestBalanceProcedureScheduler {
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///"); CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///");
CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true); CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
CONF.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
CONF.setInt(WORK_THREAD_NUM, 1); CONF.setInt(WORK_THREAD_NUM, 1);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import java.io.IOException; import java.io.IOException;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.procedure; package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;

View File

@ -44,6 +44,11 @@
<artifactId>hadoop-distcp</artifactId> <artifactId>hadoop-distcp</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId> <artifactId>hadoop-archives</artifactId>

View File

@ -32,6 +32,7 @@
<modules> <modules>
<module>hadoop-streaming</module> <module>hadoop-streaming</module>
<module>hadoop-distcp</module> <module>hadoop-distcp</module>
<module>hadoop-federation-balance</module>
<module>hadoop-dynamometer</module> <module>hadoop-dynamometer</module>
<module>hadoop-archives</module> <module>hadoop-archives</module>
<module>hadoop-archive-logs</module> <module>hadoop-archive-logs</module>