MAPREDUCE-7018. Apply erasure coding properly to framework tarball and support plain tar (miklos.szegedi@cloudera.com via rkanter)

This commit is contained in:
Robert Kanter 2017-12-11 14:00:42 -08:00
parent 00129c5314
commit 2316f52690
2 changed files with 81 additions and 61 deletions

View File

@ -81,7 +81,6 @@ public class FrameworkUploader implements Runnable {
@VisibleForTesting
OutputStream targetStream = null;
private Path targetPath = null;
private String alias = null;
private void printHelp(Options options) {
@ -140,11 +139,12 @@ public class FrameworkUploader implements Runnable {
}
}
private void beginUpload() throws IOException, UploaderException {
@VisibleForTesting
void beginUpload() throws IOException, UploaderException {
if (targetStream == null) {
validateTargetPath();
int lastIndex = target.indexOf('#');
targetPath =
Path targetPath =
new Path(
target.substring(
0, lastIndex == -1 ? target.length() : lastIndex));
@ -153,7 +153,37 @@ public class FrameworkUploader implements Runnable {
targetPath.getName();
LOG.info("Target " + targetPath);
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
targetStream = fileSystem.create(targetPath, true);
targetStream = null;
if (fileSystem instanceof DistributedFileSystem) {
LOG.info("Set replication to " +
replication + " for path: " + targetPath);
LOG.info("Disabling Erasure Coding for path: " + targetPath);
DistributedFileSystem dfs = (DistributedFileSystem)fileSystem;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
dfs.createFile(targetPath)
.overwrite(true)
.ecPolicyName(
SystemErasureCodingPolicies.getReplicationPolicy().getName());
if (replication > 0) {
builder.replication(replication);
}
targetStream = builder.build();
} else {
LOG.warn("Cannot set replication to " +
replication + " for path: " + targetPath +
" on a non-distributed fileystem " +
fileSystem.getClass().getName());
}
if (targetStream == null) {
targetStream = fileSystem.create(targetPath, true);
}
if (targetPath.getName().endsWith("gz") ||
targetPath.getName().endsWith("tgz")) {
LOG.info("Creating GZip");
targetStream = new GZIPOutputStream(targetStream);
}
}
}
@ -162,7 +192,7 @@ public class FrameworkUploader implements Runnable {
beginUpload();
LOG.info("Compressing tarball");
try (TarArchiveOutputStream out = new TarArchiveOutputStream(
new GZIPOutputStream(targetStream))) {
targetStream)) {
for (String fullPath : filteredInputFiles) {
LOG.info("Adding " + fullPath);
File file = new File(fullPath);
@ -178,25 +208,6 @@ public class FrameworkUploader implements Runnable {
targetStream.close();
}
}
if (targetPath == null) {
return;
}
// Set file attributes
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
if (fileSystem instanceof DistributedFileSystem) {
LOG.info("Disabling Erasure Coding for path: " + targetPath);
DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
dfs.setErasureCodingPolicy(targetPath,
SystemErasureCodingPolicies.getReplicationPolicy().getName());
}
if (replication > 0) {
LOG.info("Set replication to " +
replication + " for path: " + targetPath);
fileSystem.setReplication(targetPath, replication);
}
}
private void parseLists() throws UploaderException {

View File

@ -30,6 +30,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.HashSet;
@ -171,46 +172,54 @@ public class TestFrameworkUploader {
*/
@Test
public void testBuildTarBall() throws IOException, UploaderException {
File parent = new File(testDir);
try {
parent.deleteOnExit();
FrameworkUploader uploader = prepareTree(parent);
File gzipFile = new File("upload.tar.gz");
gzipFile.deleteOnExit();
Assert.assertTrue("Creating output", gzipFile.createNewFile());
uploader.targetStream = new FileOutputStream(gzipFile);
uploader.buildPackage();
TarArchiveInputStream result = null;
String[] testFiles = {"upload.tar", "upload.tar.gz"};
for (String testFile: testFiles) {
File parent = new File(testDir);
try {
result =
new TarArchiveInputStream(
new GZIPInputStream(new FileInputStream(gzipFile)));
Set<String> fileNames = new HashSet<>();
Set<Long> sizes = new HashSet<>();
TarArchiveEntry entry1 = result.getNextTarEntry();
fileNames.add(entry1.getName());
sizes.add(entry1.getSize());
TarArchiveEntry entry2 = result.getNextTarEntry();
fileNames.add(entry2.getName());
sizes.add(entry2.getSize());
Assert.assertTrue(
"File name error", fileNames.contains("a.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 13));
Assert.assertTrue(
"File name error", fileNames.contains("b.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 14));
} finally {
if (result != null) {
result.close();
parent.deleteOnExit();
FrameworkUploader uploader = prepareTree(parent);
File gzipFile =
new File(parent.getAbsolutePath() + "/" + testFile);
gzipFile.deleteOnExit();
uploader.target =
"file:///" + gzipFile.getAbsolutePath();
uploader.beginUpload();
uploader.buildPackage();
InputStream stream = new FileInputStream(gzipFile);
if (gzipFile.getName().endsWith(".gz")) {
stream = new GZIPInputStream(stream);
}
TarArchiveInputStream result = null;
try {
result =
new TarArchiveInputStream(stream);
Set<String> fileNames = new HashSet<>();
Set<Long> sizes = new HashSet<>();
TarArchiveEntry entry1 = result.getNextTarEntry();
fileNames.add(entry1.getName());
sizes.add(entry1.getSize());
TarArchiveEntry entry2 = result.getNextTarEntry();
fileNames.add(entry2.getName());
sizes.add(entry2.getSize());
Assert.assertTrue(
"File name error", fileNames.contains("a.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 13));
Assert.assertTrue(
"File name error", fileNames.contains("b.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 14));
} finally {
if (result != null) {
result.close();
}
}
} finally {
FileUtils.deleteDirectory(parent);
}
} finally {
FileUtils.deleteDirectory(parent);
}
}