HDFS-15640. Add diff threshold to FedBalance. Contributed by Jinglun.

This commit is contained in:
Yiqun Lin 2020-10-27 10:41:10 +08:00
parent 872440610f
commit 15a5f53673
6 changed files with 119 additions and 13 deletions

View File

@ -89,6 +89,8 @@ public class DistCpProcedure extends BalanceProcedure {
private boolean forceCloseOpenFiles;
/* Disable write by setting the mount point readonly. */
private boolean useMountReadOnly;
/* The threshold of diff entries. */
private int diffThreshold;
private FsPermission fPerm; // the permission of the src.
private AclStatus acl; // the acl of the src.
@ -134,6 +136,7 @@ public class DistCpProcedure extends BalanceProcedure {
this.bandWidth = context.getBandwidthLimit();
this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
this.useMountReadOnly = context.getUseMountReadOnly();
this.diffThreshold = context.getDiffThreshold();
srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
}
@ -227,12 +230,8 @@ public class DistCpProcedure extends BalanceProcedure {
} else {
throw new RetryException(); // wait job complete.
}
} else if (!verifyDiff()) {
if (!verifyOpenFiles() || forceCloseOpenFiles) {
updateStage(Stage.DISABLE_WRITE);
} else {
throw new RetryException();
}
} else if (diffDistCpStageDone()) {
updateStage(Stage.DISABLE_WRITE);
} else {
submitDiffDistCp();
}
@ -372,14 +371,38 @@ public class DistCpProcedure extends BalanceProcedure {
}
/**
* Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot.
* Check whether the conditions are satisfied for moving to the next stage.
* If the diff entries size is no greater than the threshold and the open
* files could be force closed or there is no open file, then moving to the
* next stage.
*
* @return true if the src has changed.
* @return true if moving to the next stage. false if the conditions are not
* satisfied.
* @throws RetryException if the conditions are not satisfied and the diff
* size is under the given threshold scope.
*/
private boolean verifyDiff() throws IOException {
@VisibleForTesting
boolean diffDistCpStageDone() throws IOException, RetryException {
int diffSize = getDiffSize();
if (diffSize <= diffThreshold) {
if (forceCloseOpenFiles || !verifyOpenFiles()) {
return true;
} else {
throw new RetryException();
}
}
return false;
}
/**
* Get number of the diff entries.
*
* @return number of the diff entries.
*/
private int getDiffSize() throws IOException {
SnapshotDiffReport diffReport =
srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, "");
return diffReport.getDiffList().size() > 0;
return diffReport.getDiffList().size();
}
/**

View File

@ -51,6 +51,7 @@ import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
/**
@ -91,6 +92,8 @@ public class FedBalance extends Configured implements Tool {
private TrashOption trashOpt = TrashOption.TRASH;
/* Specify the duration(millie seconds) when the procedure needs retry. */
private long delayDuration = TimeUnit.SECONDS.toMillis(1);
/* Specify the threshold of diff entries. */
private int diffThreshold = 0;
/* The source input. This specifies the source path. */
private final String inputSrc;
/* The dst input. This specifies the dst path. */
@ -155,6 +158,15 @@ public class FedBalance extends Configured implements Tool {
return this;
}
/**
* Specify the threshold of diff entries.
* @param value the threshold of a fast distcp.
*/
public Builder setDiffThreshold(int value) {
this.diffThreshold = value;
return this;
}
/**
* Build the balance job.
*/
@ -172,7 +184,8 @@ public class FedBalance extends Configured implements Tool {
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDelayDuration(delayDuration).build();
.setDelayDuration(delayDuration)
.setDiffThreshold(diffThreshold).build();
} else { // normal federation cluster.
Path src = new Path(inputSrc);
if (src.toUri().getAuthority() == null) {
@ -181,7 +194,8 @@ public class FedBalance extends Configured implements Tool {
context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
.setForceCloseOpenFiles(forceCloseOpen)
.setUseMountReadOnly(routerCluster).setMapNum(map)
.setBandwidthLimit(bandwidth).setTrash(trashOpt).build();
.setBandwidthLimit(bandwidth).setTrash(trashOpt)
.setDiffThreshold(diffThreshold).build();
}
LOG.info(context.toString());
@ -290,6 +304,10 @@ public class FedBalance extends Configured implements Tool {
builder.setDelayDuration(
Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
}
if (command.hasOption(DIFF_THRESHOLD.getOpt())) {
builder.setDiffThreshold(Integer.parseInt(
command.getOptionValue(DIFF_THRESHOLD.getOpt())));
}
if (command.hasOption(TRASH.getOpt())) {
String val = command.getOptionValue(TRASH.getOpt());
if (val.equalsIgnoreCase("skip")) {

View File

@ -54,6 +54,8 @@ public class FedBalanceContext implements Writable {
private TrashOption trashOpt;
/* How long will the procedures be delayed. */
private long delayDuration;
/* The threshold of diff entries. */
private int diffThreshold;
private Configuration conf;
@ -91,6 +93,10 @@ public class FedBalanceContext implements Writable {
return bandwidthLimit;
}
public int getDiffThreshold() {
return diffThreshold;
}
public TrashOption getTrashOpt() {
return trashOpt;
}
@ -107,6 +113,7 @@ public class FedBalanceContext implements Writable {
out.writeInt(bandwidthLimit);
out.writeInt(trashOpt.ordinal());
out.writeLong(delayDuration);
out.writeInt(diffThreshold);
}
@Override
@ -122,6 +129,7 @@ public class FedBalanceContext implements Writable {
bandwidthLimit = in.readInt();
trashOpt = TrashOption.values()[in.readInt()];
delayDuration = in.readLong();
diffThreshold = in.readInt();
}
@Override
@ -146,6 +154,7 @@ public class FedBalanceContext implements Writable {
.append(bandwidthLimit, bc.bandwidthLimit)
.append(trashOpt, bc.trashOpt)
.append(delayDuration, bc.delayDuration)
.append(diffThreshold, bc.diffThreshold)
.isEquals();
}
@ -161,6 +170,7 @@ public class FedBalanceContext implements Writable {
.append(bandwidthLimit)
.append(trashOpt)
.append(delayDuration)
.append(diffThreshold)
.build();
}
@ -180,6 +190,7 @@ public class FedBalanceContext implements Writable {
builder.append(", map=").append(mapNum);
builder.append(", bandwidth=").append(bandwidthLimit);
builder.append(", delayDuration=").append(delayDuration);
builder.append(", diffThreshold=").append(diffThreshold);
return builder.toString();
}
@ -194,6 +205,7 @@ public class FedBalanceContext implements Writable {
private int bandwidthLimit;
private TrashOption trashOpt;
private long delayDuration;
private int diffThreshold;
/**
* This class helps building the FedBalanceContext.
@ -263,6 +275,14 @@ public class FedBalanceContext implements Writable {
return this;
}
/**
* Specify the threshold of diff entries.
*/
public Builder setDiffThreshold(int value) {
this.diffThreshold = value;
return this;
}
/**
* Build the FedBalanceContext.
*
@ -280,6 +300,7 @@ public class FedBalanceContext implements Writable {
context.bandwidthLimit = this.bandwidthLimit;
context.trashOpt = this.trashOpt;
context.delayDuration = this.delayDuration;
context.diffThreshold = this.diffThreshold;
return context;
}
}

View File

@ -71,6 +71,17 @@ public final class FedBalanceOptions {
+ " needs to retry. A job may retry many times and check the state"
+ " when it waits for the distcp job to finish.");
/**
* Specify the threshold of diff entries.
*/
final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
"This specifies the threshold of the diff entries that used in"
+ " incremental copy stage. If the diff entries size is no greater"
+ " than this threshold and the open files check is satisfied"
+ "(no open files or force close all open files), the fedBalance will"
+ " go to the final round of distcp. Default value is 0, that means"
+ " waiting until there is no diff.");
/**
* Move the source path to trash after all the data are sync to target, or
* delete the source directly, or skip both trash and deletion.

View File

@ -101,6 +101,7 @@ Command `submit` has 5 options:
| -bandwidth | Specify bandwidth per map in MB. | 10 |
| -delay | Specify the delayed duration(millie seconds) when the job needs to retry. | 1000 |
| -moveToTrash | This options has 3 values: `trash` (move the source path to trash), `delete` (delete the source path directly) and `skip` (skip both trash and deletion). By default the server side trash interval is used. If the trash is disabled in the server side, the default trash interval 60 minutes is used. | trash |
| -diffThreshold | Specify the threshold of the diff entries that used in incremental copy stage. If the diff entries size is no greater than the threshold and the open files check is satisfied(no open files or force close all open files), the fedBalance will go to the final round of distcp. Setting to 0 means waiting until there is no diff.| 0 |
### Configuration Options
--------------------

View File

@ -171,6 +171,33 @@ public class TestDistCpProcedure {
cleanup(fs, new Path(testRoot));
}
@Test
public void testDiffThreshold() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT, 10);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
// Test distcp with diff entries number no greater than threshold.
Path lastPath = new Path(src, "a");
for (int i = 0; i < 5; i++) {
Path newPath = new Path(src, "a-" + i);
fs.rename(lastPath, newPath);
lastPath = newPath;
assertTrue(dcProcedure.diffDistCpStageDone());
executeProcedure(dcProcedure, Stage.DISABLE_WRITE,
() -> dcProcedure.diffDistCp());
}
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testDiffDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
@ -351,9 +378,14 @@ public class TestDistCpProcedure {
}
private FedBalanceContext buildContext(Path src, Path dst, String mount) {
return buildContext(src, dst, mount, 0);
}
private FedBalanceContext buildContext(Path src, Path dst, String mount,
int diffThreshold) {
return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
.setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
.build();
.setDiffThreshold(diffThreshold).build();
}
interface Call {