HADOOP-16058. S3A tests to include Terasort.
Contributed by Steve Loughran. This includes - HADOOP-15890. Some S3A committer tests don't match ITest* pattern; don't run in maven - MAPREDUCE-7090. BigMapOutput example doesn't work with paths off cluster fs - MAPREDUCE-7091. Terasort on S3A to switch to new committers - MAPREDUCE-7092. MR examples to work better against cloud stores
This commit is contained in:
parent
52cfbc39cc
commit
60c9042286
|
@ -128,17 +128,20 @@ public class BigMapOutput extends Configured implements Tool {
|
|||
usage();
|
||||
}
|
||||
}
|
||||
|
||||
FileSystem fs = FileSystem.get(getConf());
|
||||
if (bigMapInput == null || outputPath == null) {
|
||||
// report usage and exit
|
||||
usage();
|
||||
// this stops IDES warning about unset local variables.
|
||||
return -1;
|
||||
}
|
||||
|
||||
JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);
|
||||
|
||||
jobConf.setJobName("BigMapOutput");
|
||||
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
||||
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
||||
FileInputFormat.setInputPaths(jobConf, bigMapInput);
|
||||
if (fs.exists(outputPath)) {
|
||||
fs.delete(outputPath, true);
|
||||
}
|
||||
outputPath.getFileSystem(jobConf).delete(outputPath, true);
|
||||
FileOutputFormat.setOutputPath(jobConf, outputPath);
|
||||
jobConf.setMapperClass(IdentityMapper.class);
|
||||
jobConf.setReducerClass(IdentityReducer.class);
|
||||
|
@ -146,7 +149,10 @@ public class BigMapOutput extends Configured implements Tool {
|
|||
jobConf.setOutputValueClass(BytesWritable.class);
|
||||
|
||||
if (createInput) {
|
||||
createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
|
||||
createBigMapInputFile(jobConf,
|
||||
bigMapInput.getFileSystem(jobConf),
|
||||
bigMapInput,
|
||||
fileSizeInMB);
|
||||
}
|
||||
|
||||
Date startTime = new Date();
|
||||
|
|
|
@ -284,7 +284,7 @@ public class MRBench extends Configured implements Tool{
|
|||
}
|
||||
|
||||
JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
|
||||
FileSystem fs = FileSystem.get(jobConf);
|
||||
FileSystem fs = BASE_DIR.getFileSystem(jobConf);
|
||||
Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
|
||||
generateTextFile(fs, inputFile, inputLines, inputSortOrder);
|
||||
|
||||
|
|
|
@ -30,10 +30,8 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.mapred.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -45,7 +43,6 @@ import org.slf4j.LoggerFactory;
|
|||
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TeraOutputFormat.class);
|
||||
private OutputCommitter committer = null;
|
||||
|
||||
/**
|
||||
* Set the requirement for a final sync before the stream is closed.
|
||||
|
@ -145,12 +142,4 @@ public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
|
|||
return new TeraRecordWriter(fileOut, job);
|
||||
}
|
||||
|
||||
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
||||
throws IOException {
|
||||
if (committer == null) {
|
||||
Path output = getOutputPath(context);
|
||||
committer = new FileOutputCommitter(output, context);
|
||||
}
|
||||
return committer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -321,7 +321,7 @@ public class TeraSort extends Configured implements Tool {
|
|||
try {
|
||||
TeraInputFormat.writePartitionFile(job, partitionFile);
|
||||
} catch (Throwable e) {
|
||||
LOG.error(e.getMessage());
|
||||
LOG.error("{}", e.getMessage(), e);
|
||||
return -1;
|
||||
}
|
||||
job.addCacheFile(partitionUri);
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TestTeraSort extends HadoopTestCase {
|
|||
String[] genArgs = {NUM_ROWS, sortInput.toString()};
|
||||
|
||||
// Run TeraGen
|
||||
assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
|
||||
assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs));
|
||||
}
|
||||
|
||||
private void runTeraSort(Configuration conf,
|
||||
|
@ -71,7 +71,7 @@ public class TestTeraSort extends HadoopTestCase {
|
|||
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
|
||||
|
||||
// Run Sort
|
||||
assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
|
||||
assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs));
|
||||
}
|
||||
|
||||
private void runTeraValidator(Configuration job,
|
||||
|
@ -80,7 +80,7 @@ public class TestTeraSort extends HadoopTestCase {
|
|||
String[] svArgs = {sortOutput.toString(), valOutput.toString()};
|
||||
|
||||
// Run Tera-Validator
|
||||
assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
|
||||
assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -186,6 +186,7 @@
|
|||
<exclude>**/ITestS3AHuge*.java</exclude>
|
||||
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
|
||||
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
|
||||
<exclude>**/ITestTerasort*.java</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
@ -220,6 +221,9 @@
|
|||
<include>**/ITestS3AEncryptionSSEC*.java</include>
|
||||
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
|
||||
<include>**/ITestDynamoDBMetadataStoreScale.java</include>
|
||||
<!-- the terasort tests both work with a file in the same path in -->
|
||||
<!-- the local FS. Running them sequentially guarantees isolation -->
|
||||
<include>**/ITestTerasort*.java</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
|
|
@ -31,6 +31,9 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.service.ServiceOperations;
|
||||
|
||||
import org.hamcrest.core.Is;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
|
@ -563,6 +566,65 @@ public final class S3ATestUtils {
|
|||
removeBucketOverrides(bucket, conf, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a function; any exception raised is logged at info.
|
||||
* This is for test teardowns.
|
||||
* @param log log to use.
|
||||
* @param operation operation to invoke
|
||||
* @param <T> type of operation.
|
||||
*/
|
||||
public static <T> void callQuietly(final Logger log,
|
||||
final Invoker.Operation<T> operation) {
|
||||
try {
|
||||
operation.execute();
|
||||
} catch (Exception e) {
|
||||
log.info(e.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a void operation; any exception raised is logged at info.
|
||||
* This is for test teardowns.
|
||||
* @param log log to use.
|
||||
* @param operation operation to invoke
|
||||
*/
|
||||
public static void callQuietly(final Logger log,
|
||||
final Invoker.VoidOperation operation) {
|
||||
try {
|
||||
operation.execute();
|
||||
} catch (Exception e) {
|
||||
log.info(e.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deploy a hadoop service: init and start it.
|
||||
* @param conf configuration to use
|
||||
* @param service service to configure
|
||||
* @param <T> type of service
|
||||
* @return the started service
|
||||
*/
|
||||
public static <T extends Service> T deployService(
|
||||
final Configuration conf,
|
||||
final T service) {
|
||||
service.init(conf);
|
||||
service.start();
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate a service, returning {@code null} cast at compile-time
|
||||
* to the type of the service, for ease of setting fields to null.
|
||||
* @param service service.
|
||||
* @param <T> type of the service
|
||||
* @return null, always
|
||||
*/
|
||||
@SuppressWarnings("ThrowableNotThrown")
|
||||
public static <T extends Service> T terminateService(final T service) {
|
||||
ServiceOperations.stopQuietly(LOG, service);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to do diffs of metrics.
|
||||
*/
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.commit;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
@ -50,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
|
||||
/**
|
||||
|
@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
|||
|
||||
private InconsistentAmazonS3Client inconsistentClient;
|
||||
|
||||
|
||||
/**
|
||||
* Should the inconsistent S3A client be used?
|
||||
* Default value: true.
|
||||
|
@ -436,4 +441,63 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
|||
jContext.getConfiguration(),
|
||||
TypeConverter.fromYarn(attemptID));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Load in the success data marker: this guarantees that an S3A
|
||||
* committer was used.
|
||||
* @param fs filesystem
|
||||
* @param outputPath path of job
|
||||
* @param committerName name of committer to match
|
||||
* @return the success data
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public static SuccessData validateSuccessFile(final S3AFileSystem fs,
|
||||
final Path outputPath, final String committerName) throws IOException {
|
||||
SuccessData successData = null;
|
||||
try {
|
||||
successData = loadSuccessFile(fs, outputPath);
|
||||
} catch (FileNotFoundException e) {
|
||||
// either the output path is missing or, if its the success file,
|
||||
// somehow the relevant committer wasn't picked up.
|
||||
String dest = outputPath.toString();
|
||||
LOG.error("No _SUCCESS file found under {}", dest);
|
||||
List<String> files = new ArrayList<>();
|
||||
applyLocatedFiles(fs.listFiles(outputPath, true),
|
||||
(status) -> {
|
||||
files.add(status.getPath().toString());
|
||||
LOG.error("{} {}", status.getPath(), status.getLen());
|
||||
});
|
||||
throw new AssertionError("No _SUCCESS file in " + dest
|
||||
+ "; found : " + files.stream().collect(Collectors.joining("\n")),
|
||||
e);
|
||||
}
|
||||
String commitDetails = successData.toString();
|
||||
LOG.info("Committer name " + committerName + "\n{}",
|
||||
commitDetails);
|
||||
LOG.info("Committer statistics: \n{}",
|
||||
successData.dumpMetrics(" ", " = ", "\n"));
|
||||
LOG.info("Diagnostics\n{}",
|
||||
successData.dumpDiagnostics(" ", " = ", "\n"));
|
||||
assertEquals("Wrong committer in " + commitDetails,
|
||||
committerName, successData.getCommitter());
|
||||
return successData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a success file; fail if the file is empty/nonexistent.
|
||||
* @param fs filesystem
|
||||
* @param outputPath directory containing the success file.
|
||||
* @return the loaded file.
|
||||
* @throws IOException failure to find/load the file
|
||||
* @throws AssertionError file is 0-bytes long
|
||||
*/
|
||||
public static SuccessData loadSuccessFile(final S3AFileSystem fs,
|
||||
final Path outputPath) throws IOException {
|
||||
Path success = new Path(outputPath, _SUCCESS);
|
||||
FileStatus status = fs.getFileStatus(success);
|
||||
assertTrue("0 byte success file - not a s3guard committer " + success,
|
||||
status.getLen() > 0);
|
||||
return SuccessData.load(fs, success);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,22 +30,17 @@ import java.util.Set;
|
|||
import java.util.UUID;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
@ -54,102 +49,35 @@ import org.apache.hadoop.mapreduce.Mapper;
|
|||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.service.ServiceOperations;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
|
||||
|
||||
/**
|
||||
* Test for an MR Job with all the different committers.
|
||||
*/
|
||||
public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
|
||||
|
||||
/** Full integration test of an MR job. */
|
||||
public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractITCommitMRJob.class);
|
||||
|
||||
private static final int TEST_FILE_COUNT = 2;
|
||||
private static final int SCALE_TEST_FILE_COUNT = 20;
|
||||
|
||||
private static MiniDFSClusterService hdfs;
|
||||
private static MiniMRYarnCluster yarn = null;
|
||||
private static JobConf conf = null;
|
||||
private boolean uniqueFilenames = false;
|
||||
private boolean scaleTest;
|
||||
|
||||
protected static FileSystem getDFS() {
|
||||
return hdfs.getClusterFS();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
// the HDFS and YARN clusters share the same configuration, so
|
||||
// the HDFS cluster binding is implicitly propagated to YARN
|
||||
conf = new JobConf();
|
||||
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
|
||||
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
|
||||
|
||||
hdfs = new MiniDFSClusterService();
|
||||
hdfs.init(conf);
|
||||
hdfs.start();
|
||||
yarn = new MiniMRYarnCluster("ITCommitMRJob", 2);
|
||||
yarn.init(conf);
|
||||
yarn.start();
|
||||
}
|
||||
|
||||
@SuppressWarnings("ThrowableNotThrown")
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
conf = null;
|
||||
ServiceOperations.stopQuietly(yarn);
|
||||
ServiceOperations.stopQuietly(hdfs);
|
||||
hdfs = null;
|
||||
yarn = null;
|
||||
}
|
||||
|
||||
public static MiniDFSCluster getHdfs() {
|
||||
return hdfs.getCluster();
|
||||
}
|
||||
|
||||
public static FileSystem getLocalFS() {
|
||||
return hdfs.getLocalFS();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder temp = new TemporaryFolder();
|
||||
|
||||
/**
|
||||
* The name of the committer as returned by
|
||||
* {@link AbstractS3ACommitter#getName()} and used for committer construction.
|
||||
*/
|
||||
protected abstract String committerName();
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
scaleTest = getTestPropertyBool(
|
||||
getConfiguration(),
|
||||
KEY_SCALE_TESTS_ENABLED,
|
||||
DEFAULT_SCALE_TESTS_ENABLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getTestTimeoutMillis() {
|
||||
return SCALE_TEST_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMRJob() throws Exception {
|
||||
describe("Run a simple MR Job");
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
// final dest is in S3A
|
||||
Path outputPath = path("testMRJob");
|
||||
Path outputPath = path(getMethodName());
|
||||
|
||||
String commitUUID = UUID.randomUUID().toString();
|
||||
String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
|
||||
String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
|
||||
int numFiles = getTestFileCount();
|
||||
List<String> expectedFiles = new ArrayList<>(numFiles);
|
||||
Set<String> expectedKeys = Sets.newHashSet();
|
||||
for (int i = 0; i < numFiles; i += 1) {
|
||||
File file = temp.newFile(String.valueOf(i) + ".text");
|
||||
File file = temp.newFile(i + ".text");
|
||||
try (FileOutputStream out = new FileOutputStream(file)) {
|
||||
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
@ -160,17 +88,8 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
|||
}
|
||||
Collections.sort(expectedFiles);
|
||||
|
||||
Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
|
||||
Job mrJob = createJob();
|
||||
JobConf jobConf = (JobConf) mrJob.getConfiguration();
|
||||
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
|
||||
uniqueFilenames);
|
||||
|
||||
|
||||
bindCommitter(jobConf,
|
||||
CommitConstants.S3A_COMMITTER_FACTORY,
|
||||
committerName());
|
||||
// pass down the scale test flag
|
||||
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
|
||||
|
||||
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(mrJob, outputPath);
|
||||
|
@ -204,7 +123,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
|||
mrJob.setMaxMapAttempts(1);
|
||||
|
||||
mrJob.submit();
|
||||
try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) {
|
||||
try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
|
||||
boolean succeeded = mrJob.waitForCompletion(true);
|
||||
assertTrue("MR job failed", succeeded);
|
||||
}
|
||||
|
@ -223,24 +142,11 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
|||
}
|
||||
Collections.sort(actualFiles);
|
||||
|
||||
// load in the success data marker: this guarantees that a s3guard
|
||||
// committer was used
|
||||
Path success = new Path(outputPath, _SUCCESS);
|
||||
FileStatus status = fs.getFileStatus(success);
|
||||
assertTrue("0 byte success file - not a s3guard committer " + success,
|
||||
status.getLen() > 0);
|
||||
SuccessData successData = SuccessData.load(fs, success);
|
||||
String commitDetails = successData.toString();
|
||||
LOG.info("Committer name " + committerName() + "\n{}",
|
||||
commitDetails);
|
||||
LOG.info("Committer statistics: \n{}",
|
||||
successData.dumpMetrics(" ", " = ", "\n"));
|
||||
LOG.info("Diagnostics\n{}",
|
||||
successData.dumpDiagnostics(" ", " = ", "\n"));
|
||||
assertEquals("Wrong committer in " + commitDetails,
|
||||
committerName(), successData.getCommitter());
|
||||
SuccessData successData = validateSuccessFile(fs, outputPath,
|
||||
committerName());
|
||||
List<String> successFiles = successData.getFilenames();
|
||||
assertTrue("No filenames in " + commitDetails,
|
||||
String commitData = successData.toString();
|
||||
assertTrue("No filenames in " + commitData,
|
||||
!successFiles.isEmpty());
|
||||
|
||||
assertEquals("Should commit the expected files",
|
||||
|
@ -249,41 +155,12 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
|||
Set<String> summaryKeys = Sets.newHashSet();
|
||||
summaryKeys.addAll(successFiles);
|
||||
assertEquals("Summary keyset doesn't list the the expected paths "
|
||||
+ commitDetails, expectedKeys, summaryKeys);
|
||||
+ commitData, expectedKeys, summaryKeys);
|
||||
assertPathDoesNotExist("temporary dir",
|
||||
new Path(outputPath, CommitConstants.TEMPORARY));
|
||||
customPostExecutionValidation(outputPath, successData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file count for the test.
|
||||
* @return the number of mappers to create.
|
||||
*/
|
||||
public int getTestFileCount() {
|
||||
return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override point to let implementations tune the MR Job conf.
|
||||
* @param jobConf configuration
|
||||
*/
|
||||
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Override point for any committer specific validation operations;
|
||||
* called after the base assertions have all passed.
|
||||
* @param destPath destination of work
|
||||
* @param successData loaded success data
|
||||
* @throws Exception failure
|
||||
*/
|
||||
protected void customPostExecutionValidation(Path destPath,
|
||||
SuccessData successData)
|
||||
throws Exception {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Mapper.
|
||||
* This is executed in separate process, and must not make any assumptions
|
||||
|
@ -305,7 +182,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
|
|||
org.apache.log4j.BasicConfigurator.configure();
|
||||
boolean scaleMap = context.getConfiguration()
|
||||
.getBoolean(KEY_SCALE_TESTS_ENABLED, false);
|
||||
operations = scaleMap ? 1000 : 10;
|
||||
operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
|
||||
id = context.getTaskAttemptID().toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
|
||||
|
||||
/**
|
||||
* Full integration test MR jobs.
|
||||
*
|
||||
* This is all done on shared static mini YARN and HDFS clusters, set up before
|
||||
* any of the tests methods run.
|
||||
*
|
||||
* To isolate tests properly for parallel test runs, that static state
|
||||
* needs to be stored in the final classes implementing the tests, and
|
||||
* exposed to the base class, with the setup clusters in the
|
||||
* specific test suites creating the clusters with unique names.
|
||||
*
|
||||
* This is "hard" to do in Java, unlike, say, Scala.
|
||||
*
|
||||
* Note: this turns out not to be the root cause of ordering problems
|
||||
* with the Terasort tests (that is hard coded use of a file in the local FS),
|
||||
* but this design here does make it clear that the before and after class
|
||||
* operations are explicitly called in the subclasses.
|
||||
* If two subclasses of this class are instantiated in the same JVM, in order,
|
||||
* they are guaranteed to be isolated.
|
||||
*
|
||||
* History: this is a superclass extracted from
|
||||
* {@link AbstractITCommitMRJob} while adding support for testing terasorting.
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractYarnClusterITest.class);
|
||||
|
||||
private static final int TEST_FILE_COUNT = 2;
|
||||
private static final int SCALE_TEST_FILE_COUNT = 20;
|
||||
|
||||
public static final int SCALE_TEST_KEYS = 1000;
|
||||
public static final int BASE_TEST_KEYS = 10;
|
||||
|
||||
private boolean scaleTest;
|
||||
|
||||
private boolean uniqueFilenames = false;
|
||||
|
||||
/**
|
||||
* This is the cluster binding which every subclass must create.
|
||||
*/
|
||||
protected static final class ClusterBinding {
|
||||
|
||||
private final MiniDFSClusterService hdfs;
|
||||
|
||||
private final MiniMRYarnCluster yarn;
|
||||
|
||||
public ClusterBinding(
|
||||
final MiniDFSClusterService hdfs,
|
||||
final MiniMRYarnCluster yarn) {
|
||||
this.hdfs = checkNotNull(hdfs);
|
||||
this.yarn = checkNotNull(yarn);
|
||||
}
|
||||
|
||||
public MiniDFSClusterService getHdfs() {
|
||||
return hdfs;
|
||||
}
|
||||
|
||||
public MiniMRYarnCluster getYarn() {
|
||||
return yarn;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return getYarn().getConfig();
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
terminateService(getYarn());
|
||||
terminateService(getHdfs());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the cluster binding. This must be done in
|
||||
* class setup of the (final) subclass.
|
||||
* The HDFS and YARN clusters share the same configuration, so
|
||||
* the HDFS cluster binding is implicitly propagated to YARN.
|
||||
* @param conf configuration to start with.
|
||||
* @return the cluster binding.
|
||||
* @throws IOException failure.
|
||||
*/
|
||||
protected static ClusterBinding createCluster(JobConf conf)
|
||||
throws IOException {
|
||||
|
||||
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
|
||||
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
|
||||
|
||||
// create a unique cluster name.
|
||||
String clusterName = "yarn-" + UUID.randomUUID();
|
||||
MiniDFSClusterService miniDFSClusterService = deployService(conf,
|
||||
new MiniDFSClusterService());
|
||||
MiniMRYarnCluster yarnCluster = deployService(conf,
|
||||
new MiniMRYarnCluster(clusterName, 2));
|
||||
return new ClusterBinding(miniDFSClusterService, yarnCluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cluster binding for this subclass
|
||||
* @return
|
||||
*/
|
||||
protected abstract ClusterBinding getClusterBinding();
|
||||
|
||||
protected MiniDFSClusterService getHdfs() {
|
||||
return getClusterBinding().getHdfs();
|
||||
}
|
||||
|
||||
|
||||
protected MiniMRYarnCluster getYarn() {
|
||||
return getClusterBinding().getYarn();
|
||||
}
|
||||
|
||||
public FileSystem getLocalFS() {
|
||||
return getHdfs().getLocalFS();
|
||||
}
|
||||
|
||||
protected FileSystem getDFS() {
|
||||
return getHdfs().getClusterFS();
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the committer as returned by
|
||||
* {@link AbstractS3ACommitter#getName()} and used for committer construction.
|
||||
*/
|
||||
protected abstract String committerName();
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
assertNotNull("cluster is not bound",
|
||||
getClusterBinding());
|
||||
|
||||
scaleTest = getTestPropertyBool(
|
||||
getConfiguration(),
|
||||
KEY_SCALE_TESTS_ENABLED,
|
||||
DEFAULT_SCALE_TESTS_ENABLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getTestTimeoutMillis() {
|
||||
return SCALE_TEST_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
|
||||
protected JobConf newJobConf() {
|
||||
return new JobConf(getYarn().getConfig());
|
||||
}
|
||||
|
||||
|
||||
protected Job createJob() throws IOException {
|
||||
Job mrJob = Job.getInstance(getClusterBinding().getConf(),
|
||||
getMethodName());
|
||||
patchConfigurationForCommitter(mrJob.getConfiguration());
|
||||
return mrJob;
|
||||
}
|
||||
|
||||
protected Configuration patchConfigurationForCommitter(
|
||||
final Configuration jobConf) {
|
||||
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
|
||||
uniqueFilenames);
|
||||
bindCommitter(jobConf,
|
||||
CommitConstants.S3A_COMMITTER_FACTORY,
|
||||
committerName());
|
||||
// pass down the scale test flag
|
||||
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file count for the test.
|
||||
* @return the number of mappers to create.
|
||||
*/
|
||||
public int getTestFileCount() {
|
||||
return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override point to let implementations tune the MR Job conf.
|
||||
* @param jobConf configuration
|
||||
*/
|
||||
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Override point for any committer specific validation operations;
|
||||
* called after the base assertions have all passed.
|
||||
* @param destPath destination of work
|
||||
* @param successData loaded success data
|
||||
* @throws Exception failure
|
||||
*/
|
||||
protected void customPostExecutionValidation(Path destPath,
|
||||
SuccessData successData)
|
||||
throws Exception {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Assume that scale tests are enabled.
|
||||
*/
|
||||
protected void requireScaleTestsEnabled() {
|
||||
assume("Scale test disabled: to enable set property " +
|
||||
KEY_SCALE_TESTS_ENABLED,
|
||||
isScaleTest());
|
||||
}
|
||||
|
||||
public boolean isScaleTest() {
|
||||
return scaleTest;
|
||||
}
|
||||
|
||||
public boolean isUniqueFilenames() {
|
||||
return uniqueFilenames;
|
||||
}
|
||||
}
|
|
@ -18,6 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.commit.magic;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||
|
@ -33,7 +38,30 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
|||
* the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
|
||||
* passed down to these processes.
|
||||
*/
|
||||
public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
|
||||
public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
/**
|
||||
* Need consistency here.
|
|
@ -18,13 +18,41 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
|
||||
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Full integration test for the directory committer.
|
||||
*/
|
||||
public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
|
||||
public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf()); }
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String committerName() {
|
|
@ -18,13 +18,42 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
|
||||
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Full integration test for the partition committer.
|
||||
*/
|
||||
public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
|
||||
public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String committerName() {
|
|
@ -18,25 +18,53 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
||||
|
||||
import org.junit.Test;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.hamcrest.core.StringContains;
|
||||
import org.hamcrest.core.StringEndsWith;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
|
||||
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
|
||||
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
|
||||
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
|
||||
|
||||
/**
|
||||
* Full integration test for the staging committer.
|
||||
*/
|
||||
public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
|
||||
public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String committerName() {
|
||||
|
@ -51,12 +79,12 @@ public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
|
|||
public void testStagingDirectory() throws Throwable {
|
||||
FileSystem hdfs = getDFS();
|
||||
Configuration conf = hdfs.getConf();
|
||||
conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
|
||||
"private");
|
||||
conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
|
||||
Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
|
||||
assertThat(dir.toString(), StringEndsWith.endsWith(
|
||||
"UUID/"
|
||||
+ StagingCommitterConstants.STAGING_UPLOADS));
|
||||
assertThat("Directory " + dir + " path is wrong",
|
||||
dir.toString(),
|
||||
StringEndsWith.endsWith("UUID/"
|
||||
+ STAGING_UPLOADS));
|
||||
assertTrue("path unqualified", dir.isAbsolute());
|
||||
String self = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
assertThat(dir.toString(),
|
|
@ -20,6 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
|
||||
|
@ -33,7 +36,30 @@ import org.apache.hadoop.test.LambdaTestUtils;
|
|||
* This is a test to verify that the committer will fail if the destination
|
||||
* directory exists, and that this happens in job setup.
|
||||
*/
|
||||
public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
|
||||
public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String committerName() {
|
||||
|
@ -59,4 +85,5 @@ public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
|
|||
"Output directory",
|
||||
super::testMRJob);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.terasort;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.examples.terasort.TeraGen;
|
||||
import org.apache.hadoop.examples.terasort.TeraSort;
|
||||
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
|
||||
import org.apache.hadoop.examples.terasort.TeraValidate;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
|
||||
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import static java.util.Optional.empty;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
|
||||
|
||||
/**
|
||||
* Runs Terasort against S3A.
|
||||
*
|
||||
* This is all done on a shared mini YARN and HDFS clusters, set up before
|
||||
* any of the tests methods run.
|
||||
*
|
||||
* The tests run in sequence, so each operation is isolated.
|
||||
* This also means that the test paths deleted in test
|
||||
* teardown; shared variables must all be static.
|
||||
*/
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
public abstract class AbstractCommitTerasortIT extends
|
||||
AbstractYarnClusterITest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractCommitTerasortIT.class);
|
||||
|
||||
// all the durations are optional as they only get filled in when
|
||||
// a test run successfully completes. Failed tests don't have numbers.
|
||||
private static Optional<DurationInfo> terasortDuration = empty();
|
||||
|
||||
private static Optional<DurationInfo> teragenStageDuration = empty();
|
||||
|
||||
private static Optional<DurationInfo> terasortStageDuration = empty();
|
||||
|
||||
private static Optional<DurationInfo> teravalidateStageDuration = empty();
|
||||
|
||||
private Path terasortPath;
|
||||
|
||||
private Path sortInput;
|
||||
|
||||
private Path sortOutput;
|
||||
|
||||
private Path sortValidate;
|
||||
|
||||
/**
|
||||
* Not using special paths here.
|
||||
* @return false
|
||||
*/
|
||||
@Override
|
||||
public boolean useInconsistentClient() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
requireScaleTestsEnabled();
|
||||
prepareToTerasort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up for terasorting by initializing paths.
|
||||
* The paths used must be unique across parallel runs.
|
||||
*/
|
||||
private void prepareToTerasort() {
|
||||
// small sample size for faster runs
|
||||
Configuration yarnConfig = getYarn().getConfig();
|
||||
yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
|
||||
yarnConfig.setBoolean(
|
||||
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
|
||||
true);
|
||||
terasortPath = new Path("/terasort-" + getClass().getSimpleName())
|
||||
.makeQualified(getFileSystem());
|
||||
sortInput = new Path(terasortPath, "sortin");
|
||||
sortOutput = new Path(terasortPath, "sortout");
|
||||
sortValidate = new Path(terasortPath, "validate");
|
||||
if (!terasortDuration.isPresent()) {
|
||||
terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a single stage in the terasort,
|
||||
* @param stage Stage name for messages/assertions.
|
||||
* @param jobConf job conf
|
||||
* @param dest destination directory -the _SUCCESS File will be expected here.
|
||||
* @param tool tool to run.
|
||||
* @param args args for the tool.
|
||||
* @throws Exception any failure
|
||||
*/
|
||||
private Optional<DurationInfo> executeStage(
|
||||
final String stage,
|
||||
final JobConf jobConf,
|
||||
final Path dest,
|
||||
final Tool tool,
|
||||
final String[] args) throws Exception {
|
||||
int result;
|
||||
DurationInfo d = new DurationInfo(LOG, stage);
|
||||
try {
|
||||
result = ToolRunner.run(jobConf, tool, args);
|
||||
} finally {
|
||||
d.close();
|
||||
}
|
||||
assertEquals(stage
|
||||
+ "(" + StringUtils.join(", ", args) + ")"
|
||||
+ " failed", 0, result);
|
||||
validateSuccessFile(getFileSystem(), dest, committerName());
|
||||
return Optional.of(d);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up terasort by cleaning out the destination, and note the initial
|
||||
* time before any of the jobs are executed.
|
||||
*/
|
||||
@Test
|
||||
public void test_100_terasort_setup() throws Throwable {
|
||||
describe("Setting up for a terasort");
|
||||
|
||||
getFileSystem().delete(terasortPath, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_110_teragen() throws Throwable {
|
||||
describe("Teragen to %s", sortInput);
|
||||
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
teragenStageDuration = executeStage("Teragen",
|
||||
jobConf,
|
||||
sortInput,
|
||||
new TeraGen(),
|
||||
new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_120_terasort() throws Throwable {
|
||||
describe("Terasort from %s to %s", sortInput, sortOutput);
|
||||
loadSuccessFile(getFileSystem(), sortInput);
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
// this job adds some data, so skip it.
|
||||
jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
|
||||
terasortStageDuration = executeStage("TeraSort",
|
||||
jobConf,
|
||||
sortOutput,
|
||||
new TeraSort(),
|
||||
new String[]{sortInput.toString(), sortOutput.toString()});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_130_teravalidate() throws Throwable {
|
||||
describe("TeraValidate from %s to %s", sortOutput, sortValidate);
|
||||
loadSuccessFile(getFileSystem(), sortOutput);
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
teravalidateStageDuration = executeStage("TeraValidate",
|
||||
jobConf,
|
||||
sortValidate,
|
||||
new TeraValidate(),
|
||||
new String[]{sortOutput.toString(), sortValidate.toString()});
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the results, and save to the base dir as a CSV file.
|
||||
* Why there? Makes it easy to list and compare.
|
||||
*/
|
||||
@Test
|
||||
public void test_140_teracomplete() throws Throwable {
|
||||
terasortDuration.get().close();
|
||||
|
||||
final StringBuilder results = new StringBuilder();
|
||||
results.append("\"Operation\"\t\"Duration\"\n");
|
||||
|
||||
// this is how you dynamically create a function in a method
|
||||
// for use afterwards.
|
||||
// Works because there's no IOEs being raised in this sequence.
|
||||
BiConsumer<String, Optional<DurationInfo>> stage =
|
||||
(s, od) ->
|
||||
results.append(String.format("\"%s\"\t\"%s\"\n",
|
||||
s,
|
||||
od.map(DurationInfo::getDurationString).orElse("")));
|
||||
|
||||
stage.accept("Generate", teragenStageDuration);
|
||||
stage.accept("Terasort", terasortStageDuration);
|
||||
stage.accept("Validate", teravalidateStageDuration);
|
||||
stage.accept("Completed", terasortDuration);
|
||||
String text = results.toString();
|
||||
Path path = new Path(terasortPath, "results.csv");
|
||||
LOG.info("Results are in {}\n{}", path, text);
|
||||
ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the duration so if two committer tests are run sequentially.
|
||||
* Without this the total execution time is reported as from the start of
|
||||
* the first test suite to the end of the second.
|
||||
*/
|
||||
@Test
|
||||
public void test_150_teracleanup() throws Throwable {
|
||||
terasortDuration = Optional.empty();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.terasort;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Terasort with the directory committer.
|
||||
*/
|
||||
public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String committerName() {
|
||||
return DirectoryStagingCommitter.NAME;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.terasort;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||
|
||||
/**
|
||||
* Terasort with the magic committer.
|
||||
*/
|
||||
public final class ITestTerasortMagicCommitter
|
||||
extends AbstractCommitTerasortIT {
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClusters() throws IOException {
|
||||
clusterBinding = createCluster(new JobConf());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
clusterBinding.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
@Override
|
||||
protected String committerName() {
|
||||
return MagicS3GuardCommitter.NAME;
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn on the magic commit support for the FS, else nothing will work.
|
||||
* @param conf configuration
|
||||
*/
|
||||
@Override
|
||||
protected void applyCustomConfigOptions(JobConf conf) {
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue