MAPREDUCE-7032. Add the ability to specify a delayed replication count (miklos.szegedi@cloudera.com via rkanter)

This commit is contained in:
Robert Kanter 2018-01-16 10:45:45 -08:00
parent 5ac109909a
commit d716084f45
2 changed files with 128 additions and 17 deletions

View File

@ -25,6 +25,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -43,6 +45,8 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.NotLinkException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -73,7 +77,15 @@ public class FrameworkUploader implements Runnable {
@VisibleForTesting
String target = null;
@VisibleForTesting
short replication = 10;
Path targetPath = null;
@VisibleForTesting
short initialReplication = 3;
@VisibleForTesting
short finalReplication = 10;
@VisibleForTesting
short acceptableReplication = 9;
@VisibleForTesting
int timeout = 10;
private boolean ignoreSymlink = false;
@VisibleForTesting
@ -101,9 +113,10 @@ public class FrameworkUploader implements Runnable {
LOG.info(
"Suggested mapreduce.application.classpath $PWD/" + alias + "/*");
System.out.println("Suggested classpath $PWD/" + alias + "/*");
} catch (UploaderException|IOException e) {
} catch (UploaderException|IOException|InterruptedException e) {
LOG.error("Error in execution " + e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
}
}
@ -147,7 +160,7 @@ public class FrameworkUploader implements Runnable {
if (targetStream == null) {
validateTargetPath();
int lastIndex = target.indexOf('#');
Path targetPath =
targetPath =
new Path(
target.substring(
0, lastIndex == -1 ? target.length() : lastIndex));
@ -160,7 +173,7 @@ public class FrameworkUploader implements Runnable {
targetStream = null;
if (fileSystem instanceof DistributedFileSystem) {
LOG.info("Set replication to " +
replication + " for path: " + targetPath);
initialReplication + " for path: " + targetPath);
LOG.info("Disabling Erasure Coding for path: " + targetPath);
DistributedFileSystem dfs = (DistributedFileSystem)fileSystem;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
@ -168,13 +181,13 @@ public class FrameworkUploader implements Runnable {
.overwrite(true)
.ecPolicyName(
SystemErasureCodingPolicies.getReplicationPolicy().getName());
if (replication > 0) {
builder.replication(replication);
if (initialReplication > 0) {
builder.replication(initialReplication);
}
targetStream = builder.build();
} else {
LOG.warn("Cannot set replication to " +
replication + " for path: " + targetPath +
initialReplication + " for path: " + targetPath +
" on a non-distributed fileystem " +
fileSystem.getClass().getName());
}
@ -190,8 +203,70 @@ public class FrameworkUploader implements Runnable {
}
}
private long getSmallestReplicatedBlockCount()
throws IOException {
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
FileStatus status = fileSystem.getFileStatus(targetPath);
long length = status.getLen();
HashMap<Long, Integer> blockCount = new HashMap<>();
// Start with 0s for each offset
for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
blockCount.put(offset, 0);
}
// Count blocks
BlockLocation[] locations = fileSystem.getFileBlockLocations(
targetPath, 0, length);
for(BlockLocation location: locations) {
final int replicas = location.getHosts().length;
blockCount.compute(
location.getOffset(), (key, value) -> value + replicas);
}
// Print out the results
for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
LOG.info(String.format(
"Replication counts offset:%d blocks:%d",
offset, blockCount.get(offset)));
}
return Collections.min(blockCount.values());
}
private void endUpload()
throws IOException, InterruptedException {
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
if (fileSystem instanceof DistributedFileSystem) {
fileSystem.setReplication(targetPath, finalReplication);
LOG.info("Set replication to " +
finalReplication + " for path: " + targetPath);
long startTime = System.currentTimeMillis();
long endTime = startTime;
long currentReplication = 0;
while(endTime - startTime < timeout * 1000 &&
currentReplication < acceptableReplication) {
Thread.sleep(1000);
endTime = System.currentTimeMillis();
currentReplication = getSmallestReplicatedBlockCount();
}
if (endTime - startTime >= timeout * 1000) {
LOG.error(String.format(
"Timed out after %d seconds while waiting for acceptable" +
" replication of %d (current replication is %d)",
timeout, acceptableReplication, currentReplication));
}
} else {
LOG.info("Cannot set replication to " +
finalReplication + " for path: " + targetPath +
" on a non-distributed fileystem " +
fileSystem.getClass().getName());
}
}
@VisibleForTesting
void buildPackage() throws IOException, UploaderException {
void buildPackage()
throws IOException, UploaderException, InterruptedException {
beginUpload();
LOG.info("Compressing tarball");
try (TarArchiveOutputStream out = new TarArchiveOutputStream(
@ -206,6 +281,7 @@ public class FrameworkUploader implements Runnable {
out.closeArchiveEntry();
}
}
endUpload();
} finally {
if (targetStream != null) {
targetStream.close();
@ -378,8 +454,21 @@ public class FrameworkUploader implements Runnable {
.hasArg().create("target"));
opts.addOption(OptionBuilder
.withDescription(
"Desired replication count")
.hasArg().create("replication"));
"Desired initial replication count. Default 3.")
.hasArg().create("initialReplication"));
opts.addOption(OptionBuilder
.withDescription(
"Desired final replication count. Default 10.")
.hasArg().create("finalReplication"));
opts.addOption(OptionBuilder
.withDescription(
"Desired acceptable replication count. Default 9.")
.hasArg().create("acceptableReplication"));
opts.addOption(OptionBuilder
.withDescription(
"Desired timeout for the acceptable" +
" replication in seconds. Default 10")
.hasArg().create("timeout"));
opts.addOption(OptionBuilder
.withDescription("Ignore symlinks into the same directory")
.create("nosymlink"));
@ -395,8 +484,19 @@ public class FrameworkUploader implements Runnable {
"whitelist", DefaultJars.DEFAULT_MR_JARS);
blacklist = parser.getCommandLine().getOptionValue(
"blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS);
replication = Short.parseShort(parser.getCommandLine().getOptionValue(
"replication", "10"));
initialReplication =
Short.parseShort(parser.getCommandLine().getOptionValue(
"initialReplication", "3"));
finalReplication =
Short.parseShort(parser.getCommandLine().getOptionValue(
"finalReplication", "10"));
acceptableReplication =
Short.parseShort(
parser.getCommandLine().getOptionValue(
"acceptableReplication", "9"));
timeout =
Integer.parseInt(
parser.getCommandLine().getOptionValue("timeout", "10"));
if (parser.getCommandLine().hasOption("nosymlink")) {
ignoreSymlink = true;
}

View File

@ -104,7 +104,10 @@ public class TestFrameworkUploader {
"-blacklist", "C",
"-fs", "hdfs://C:8020",
"-target", "D",
"-replication", "100"};
"-initialReplication", "100",
"-acceptableReplication", "120",
"-finalReplication", "140",
"-timeout", "10"};
FrameworkUploader uploader = new FrameworkUploader();
boolean success = uploader.parseArguments(args);
Assert.assertTrue("Expected to print help", success);
@ -116,8 +119,14 @@ public class TestFrameworkUploader {
uploader.blacklist);
Assert.assertEquals("Target mismatch", "hdfs://C:8020/D",
uploader.target);
Assert.assertEquals("Replication mismatch", 100,
uploader.replication);
Assert.assertEquals("Initial replication mismatch", 100,
uploader.initialReplication);
Assert.assertEquals("Acceptable replication mismatch", 120,
uploader.acceptableReplication);
Assert.assertEquals("Final replication mismatch", 140,
uploader.finalReplication);
Assert.assertEquals("Timeout mismatch", 10,
uploader.timeout);
}
/**
@ -176,7 +185,8 @@ public class TestFrameworkUploader {
* Test building a tarball from source jars.
*/
@Test
public void testBuildTarBall() throws IOException, UploaderException {
public void testBuildTarBall()
throws IOException, UploaderException, InterruptedException {
String[] testFiles = {"upload.tar", "upload.tar.gz"};
for (String testFile: testFiles) {
File parent = new File(testDir);
@ -232,7 +242,8 @@ public class TestFrameworkUploader {
* Test upload to HDFS.
*/
@Test
public void testUpload() throws IOException, UploaderException {
public void testUpload()
throws IOException, UploaderException, InterruptedException {
final String fileName = "/upload.tar.gz";
File parent = new File(testDir);
try {