From 60c9042286609131deb1a06ba80bc3473d880414 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 29 Mar 2019 15:25:45 +0000 Subject: [PATCH] HADOOP-16058. S3A tests to include Terasort. Contributed by Steve Loughran. This includes - HADOOP-15890. Some S3A committer tests don't match ITest* pattern; don't run in maven - MAPREDUCE-7090. BigMapOutput example doesn't work with paths off cluster fs - MAPREDUCE-7091. Terasort on S3A to switch to new committers - MAPREDUCE-7092. MR examples to work better against cloud stores --- .../apache/hadoop/mapred/BigMapOutput.java | 18 +- .../org/apache/hadoop/mapred/MRBench.java | 2 +- .../examples/terasort/TeraOutputFormat.java | 11 - .../hadoop/examples/terasort/TeraSort.java | 2 +- .../examples/terasort/TestTeraSort.java | 6 +- hadoop-tools/hadoop-aws/pom.xml | 4 + .../apache/hadoop/fs/s3a/S3ATestUtils.java | 62 +++++ .../fs/s3a/commit/AbstractCommitITest.java | 64 +++++ .../fs/s3a/commit/AbstractITCommitMRJob.java | 161 ++--------- .../s3a/commit/AbstractYarnClusterITest.java | 256 ++++++++++++++++++ ...tMRJob.java => ITestMagicCommitMRJob.java} | 30 +- ...ob.java => ITestDirectoryCommitMRJob.java} | 30 +- ...ob.java => ITestPartitionCommitMRJob.java} | 31 ++- ...RJob.java => ITestStagingCommitMRJob.java} | 46 +++- ...va => ITestStagingCommitMRJobBadDest.java} | 29 +- .../terasort/AbstractCommitTerasortIT.java | 241 +++++++++++++++++ .../ITestTerasortDirectoryCommitter.java | 62 +++++ .../terasort/ITestTerasortMagicCommitter.java | 73 +++++ 18 files changed, 951 insertions(+), 177 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/{ITMagicCommitMRJob.java => ITestMagicCommitMRJob.java} (74%) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/{ITDirectoryCommitMRJob.java => ITestDirectoryCommitMRJob.java} (59%) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/{ITPartitionCommitMRJob.java => ITestPartitionCommitMRJob.java} (59%) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/{ITStagingCommitMRJob.java => ITestStagingCommitMRJob.java} (63%) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/{ITStagingCommitMRJobBadDest.java => ITestStagingCommitMRJobBadDest.java} (74%) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java index 964673b1ba6..35992f5de00 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java @@ -128,17 +128,20 @@ public class BigMapOutput extends Configured implements Tool { usage(); } } - - FileSystem fs = FileSystem.get(getConf()); + if (bigMapInput == null || outputPath == null) { + // report usage and exit + usage(); + // this stops IDES warning about unset local variables. + return -1; + } + JobConf jobConf = new JobConf(getConf(), BigMapOutput.class); jobConf.setJobName("BigMapOutput"); jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); FileInputFormat.setInputPaths(jobConf, bigMapInput); - if (fs.exists(outputPath)) { - fs.delete(outputPath, true); - } + outputPath.getFileSystem(jobConf).delete(outputPath, true); FileOutputFormat.setOutputPath(jobConf, outputPath); jobConf.setMapperClass(IdentityMapper.class); jobConf.setReducerClass(IdentityReducer.class); @@ -146,7 +149,10 @@ public class BigMapOutput extends Configured implements Tool { jobConf.setOutputValueClass(BytesWritable.class); if (createInput) { - createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB); + createBigMapInputFile(jobConf, + bigMapInput.getFileSystem(jobConf), + bigMapInput, + fileSizeInMB); } Date startTime = new Date(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java index 53287563a99..36f469385e0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java @@ -284,7 +284,7 @@ public class MRBench extends Configured implements Tool{ } JobConf jobConf = setupJob(numMaps, numReduces, jarFile); - FileSystem fs = FileSystem.get(jobConf); + FileSystem fs = BASE_DIR.getFileSystem(jobConf); Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt"); generateTextFile(fs, inputFile, inputLines, inputSortOrder); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java index e0ce36ce49a..14fea569e64 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java @@ -30,10 +30,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; import org.slf4j.Logger; @@ -45,7 +43,6 @@ import org.slf4j.LoggerFactory; public class TeraOutputFormat extends FileOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(TeraOutputFormat.class); - private OutputCommitter committer = null; /** * Set the requirement for a final sync before the stream is closed. @@ -145,12 +142,4 @@ public class TeraOutputFormat extends FileOutputFormat { return new TeraRecordWriter(fileOut, job); } - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException { - if (committer == null) { - Path output = getOutputPath(context); - committer = new FileOutputCommitter(output, context); - } - return committer; - } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java index 8b698e35dd9..e21653e0982 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java @@ -321,7 +321,7 @@ public class TeraSort extends Configured implements Tool { try { TeraInputFormat.writePartitionFile(job, partitionFile); } catch (Throwable e) { - LOG.error(e.getMessage()); + LOG.error("{}", e.getMessage(), e); return -1; } job.addCacheFile(partitionUri); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java index b3016599d19..992ac5009dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java @@ -61,7 +61,7 @@ public class TestTeraSort extends HadoopTestCase { String[] genArgs = {NUM_ROWS, sortInput.toString()}; // Run TeraGen - assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0); + assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs)); } private void runTeraSort(Configuration conf, @@ -71,7 +71,7 @@ public class TestTeraSort extends HadoopTestCase { String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; // Run Sort - assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0); + assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs)); } private void runTeraValidator(Configuration job, @@ -80,7 +80,7 @@ public class TestTeraSort extends HadoopTestCase { String[] svArgs = {sortOutput.toString(), valOutput.toString()}; // Run Tera-Validator - assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0); + assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs)); } @Test diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 08c53e72a46..48a284acefe 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -186,6 +186,7 @@ **/ITestS3AHuge*.java **/ITestDynamoDBMetadataStoreScale.java + **/ITestTerasort*.java @@ -220,6 +221,9 @@ **/ITestS3AEncryptionSSEC*.java **/ITestDynamoDBMetadataStoreScale.java + + + **/ITestTerasort*.java diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index ecb5d0b7069..a841a667902 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -31,6 +31,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceOperations; + import org.hamcrest.core.Is; import org.junit.Assert; import org.junit.Assume; @@ -563,6 +566,65 @@ public final class S3ATestUtils { removeBucketOverrides(bucket, conf, options); } + /** + * Call a function; any exception raised is logged at info. + * This is for test teardowns. + * @param log log to use. + * @param operation operation to invoke + * @param type of operation. + */ + public static void callQuietly(final Logger log, + final Invoker.Operation operation) { + try { + operation.execute(); + } catch (Exception e) { + log.info(e.toString(), e); + } + } + + /** + * Call a void operation; any exception raised is logged at info. + * This is for test teardowns. + * @param log log to use. + * @param operation operation to invoke + */ + public static void callQuietly(final Logger log, + final Invoker.VoidOperation operation) { + try { + operation.execute(); + } catch (Exception e) { + log.info(e.toString(), e); + } + } + + /** + * Deploy a hadoop service: init and start it. + * @param conf configuration to use + * @param service service to configure + * @param type of service + * @return the started service + */ + public static T deployService( + final Configuration conf, + final T service) { + service.init(conf); + service.start(); + return service; + } + + /** + * Terminate a service, returning {@code null} cast at compile-time + * to the type of the service, for ease of setting fields to null. + * @param service service. + * @param type of the service + * @return null, always + */ + @SuppressWarnings("ThrowableNotThrown") + public static T terminateService(final T service) { + ServiceOperations.stopQuietly(LOG, service); + return null; + } + /** * Helper class to do diffs of metrics. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 246bf9d6134..0a3d07a195b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -18,8 +18,10 @@ package org.apache.hadoop.fs.s3a.commit; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -50,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; /** @@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { private InconsistentAmazonS3Client inconsistentClient; + /** * Should the inconsistent S3A client be used? * Default value: true. @@ -436,4 +441,63 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { jContext.getConfiguration(), TypeConverter.fromYarn(attemptID)); } + + + /** + * Load in the success data marker: this guarantees that an S3A + * committer was used. + * @param fs filesystem + * @param outputPath path of job + * @param committerName name of committer to match + * @return the success data + * @throws IOException IO failure + */ + public static SuccessData validateSuccessFile(final S3AFileSystem fs, + final Path outputPath, final String committerName) throws IOException { + SuccessData successData = null; + try { + successData = loadSuccessFile(fs, outputPath); + } catch (FileNotFoundException e) { + // either the output path is missing or, if its the success file, + // somehow the relevant committer wasn't picked up. + String dest = outputPath.toString(); + LOG.error("No _SUCCESS file found under {}", dest); + List files = new ArrayList<>(); + applyLocatedFiles(fs.listFiles(outputPath, true), + (status) -> { + files.add(status.getPath().toString()); + LOG.error("{} {}", status.getPath(), status.getLen()); + }); + throw new AssertionError("No _SUCCESS file in " + dest + + "; found : " + files.stream().collect(Collectors.joining("\n")), + e); + } + String commitDetails = successData.toString(); + LOG.info("Committer name " + committerName + "\n{}", + commitDetails); + LOG.info("Committer statistics: \n{}", + successData.dumpMetrics(" ", " = ", "\n")); + LOG.info("Diagnostics\n{}", + successData.dumpDiagnostics(" ", " = ", "\n")); + assertEquals("Wrong committer in " + commitDetails, + committerName, successData.getCommitter()); + return successData; + } + + /** + * Load a success file; fail if the file is empty/nonexistent. + * @param fs filesystem + * @param outputPath directory containing the success file. + * @return the loaded file. + * @throws IOException failure to find/load the file + * @throws AssertionError file is 0-bytes long + */ + public static SuccessData loadSuccessFile(final S3AFileSystem fs, + final Path outputPath) throws IOException { + Path success = new Path(outputPath, _SUCCESS); + FileStatus status = fs.getFileStatus(success); + assertTrue("0 byte success file - not a s3guard committer " + success, + status.getLen() > 0); + return SuccessData.load(fs, success); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java index 161db8521de..09da720ca8d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java @@ -30,22 +30,17 @@ import java.util.Set; import java.util.UUID; import com.google.common.collect.Sets; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; @@ -54,102 +49,35 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; -import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; -import org.apache.hadoop.service.ServiceOperations; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; + +/** + * Test for an MR Job with all the different committers. + */ +public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest { -/** Full integration test of an MR job. */ -public abstract class AbstractITCommitMRJob extends AbstractCommitITest { private static final Logger LOG = LoggerFactory.getLogger(AbstractITCommitMRJob.class); - private static final int TEST_FILE_COUNT = 2; - private static final int SCALE_TEST_FILE_COUNT = 20; - - private static MiniDFSClusterService hdfs; - private static MiniMRYarnCluster yarn = null; - private static JobConf conf = null; - private boolean uniqueFilenames = false; - private boolean scaleTest; - - protected static FileSystem getDFS() { - return hdfs.getClusterFS(); - } - - @BeforeClass - public static void setupClusters() throws IOException { - // the HDFS and YARN clusters share the same configuration, so - // the HDFS cluster binding is implicitly propagated to YARN - conf = new JobConf(); - conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); - conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); - - hdfs = new MiniDFSClusterService(); - hdfs.init(conf); - hdfs.start(); - yarn = new MiniMRYarnCluster("ITCommitMRJob", 2); - yarn.init(conf); - yarn.start(); - } - - @SuppressWarnings("ThrowableNotThrown") - @AfterClass - public static void teardownClusters() throws IOException { - conf = null; - ServiceOperations.stopQuietly(yarn); - ServiceOperations.stopQuietly(hdfs); - hdfs = null; - yarn = null; - } - - public static MiniDFSCluster getHdfs() { - return hdfs.getCluster(); - } - - public static FileSystem getLocalFS() { - return hdfs.getLocalFS(); - } - @Rule public final TemporaryFolder temp = new TemporaryFolder(); - /** - * The name of the committer as returned by - * {@link AbstractS3ACommitter#getName()} and used for committer construction. - */ - protected abstract String committerName(); - - @Override - public void setup() throws Exception { - super.setup(); - scaleTest = getTestPropertyBool( - getConfiguration(), - KEY_SCALE_TESTS_ENABLED, - DEFAULT_SCALE_TESTS_ENABLED); - } - - @Override - protected int getTestTimeoutMillis() { - return SCALE_TEST_TIMEOUT_SECONDS * 1000; - } - @Test public void testMRJob() throws Exception { + describe("Run a simple MR Job"); + S3AFileSystem fs = getFileSystem(); // final dest is in S3A - Path outputPath = path("testMRJob"); + Path outputPath = path(getMethodName()); String commitUUID = UUID.randomUUID().toString(); - String suffix = uniqueFilenames ? ("-" + commitUUID) : ""; + String suffix = isUniqueFilenames() ? ("-" + commitUUID) : ""; int numFiles = getTestFileCount(); List expectedFiles = new ArrayList<>(numFiles); Set expectedKeys = Sets.newHashSet(); for (int i = 0; i < numFiles; i += 1) { - File file = temp.newFile(String.valueOf(i) + ".text"); + File file = temp.newFile(i + ".text"); try (FileOutputStream out = new FileOutputStream(file)) { out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); } @@ -160,17 +88,8 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest { } Collections.sort(expectedFiles); - Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job"); + Job mrJob = createJob(); JobConf jobConf = (JobConf) mrJob.getConfiguration(); - jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, - uniqueFilenames); - - - bindCommitter(jobConf, - CommitConstants.S3A_COMMITTER_FACTORY, - committerName()); - // pass down the scale test flag - jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); FileOutputFormat.setOutputPath(mrJob, outputPath); @@ -204,7 +123,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest { mrJob.setMaxMapAttempts(1); mrJob.submit(); - try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) { + try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) { boolean succeeded = mrJob.waitForCompletion(true); assertTrue("MR job failed", succeeded); } @@ -223,24 +142,11 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest { } Collections.sort(actualFiles); - // load in the success data marker: this guarantees that a s3guard - // committer was used - Path success = new Path(outputPath, _SUCCESS); - FileStatus status = fs.getFileStatus(success); - assertTrue("0 byte success file - not a s3guard committer " + success, - status.getLen() > 0); - SuccessData successData = SuccessData.load(fs, success); - String commitDetails = successData.toString(); - LOG.info("Committer name " + committerName() + "\n{}", - commitDetails); - LOG.info("Committer statistics: \n{}", - successData.dumpMetrics(" ", " = ", "\n")); - LOG.info("Diagnostics\n{}", - successData.dumpDiagnostics(" ", " = ", "\n")); - assertEquals("Wrong committer in " + commitDetails, - committerName(), successData.getCommitter()); + SuccessData successData = validateSuccessFile(fs, outputPath, + committerName()); List successFiles = successData.getFilenames(); - assertTrue("No filenames in " + commitDetails, + String commitData = successData.toString(); + assertTrue("No filenames in " + commitData, !successFiles.isEmpty()); assertEquals("Should commit the expected files", @@ -249,41 +155,12 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest { Set summaryKeys = Sets.newHashSet(); summaryKeys.addAll(successFiles); assertEquals("Summary keyset doesn't list the the expected paths " - + commitDetails, expectedKeys, summaryKeys); + + commitData, expectedKeys, summaryKeys); assertPathDoesNotExist("temporary dir", new Path(outputPath, CommitConstants.TEMPORARY)); customPostExecutionValidation(outputPath, successData); } - /** - * Get the file count for the test. - * @return the number of mappers to create. - */ - public int getTestFileCount() { - return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; - } - - /** - * Override point to let implementations tune the MR Job conf. - * @param jobConf configuration - */ - protected void applyCustomConfigOptions(JobConf jobConf) throws IOException { - - } - - /** - * Override point for any committer specific validation operations; - * called after the base assertions have all passed. - * @param destPath destination of work - * @param successData loaded success data - * @throws Exception failure - */ - protected void customPostExecutionValidation(Path destPath, - SuccessData successData) - throws Exception { - - } - /** * Test Mapper. * This is executed in separate process, and must not make any assumptions @@ -305,7 +182,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest { org.apache.log4j.BasicConfigurator.configure(); boolean scaleMap = context.getConfiguration() .getBoolean(KEY_SCALE_TESTS_ENABLED, false); - operations = scaleMap ? 1000 : 10; + operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS; id = context.getTaskAttemptID().toString(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java new file mode 100644 index 00000000000..45f07389c6a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES; + +/** + * Full integration test MR jobs. + * + * This is all done on shared static mini YARN and HDFS clusters, set up before + * any of the tests methods run. + * + * To isolate tests properly for parallel test runs, that static state + * needs to be stored in the final classes implementing the tests, and + * exposed to the base class, with the setup clusters in the + * specific test suites creating the clusters with unique names. + * + * This is "hard" to do in Java, unlike, say, Scala. + * + * Note: this turns out not to be the root cause of ordering problems + * with the Terasort tests (that is hard coded use of a file in the local FS), + * but this design here does make it clear that the before and after class + * operations are explicitly called in the subclasses. + * If two subclasses of this class are instantiated in the same JVM, in order, + * they are guaranteed to be isolated. + * + * History: this is a superclass extracted from + * {@link AbstractITCommitMRJob} while adding support for testing terasorting. + * + */ +public abstract class AbstractYarnClusterITest extends AbstractCommitITest { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractYarnClusterITest.class); + + private static final int TEST_FILE_COUNT = 2; + private static final int SCALE_TEST_FILE_COUNT = 20; + + public static final int SCALE_TEST_KEYS = 1000; + public static final int BASE_TEST_KEYS = 10; + + private boolean scaleTest; + + private boolean uniqueFilenames = false; + + /** + * This is the cluster binding which every subclass must create. + */ + protected static final class ClusterBinding { + + private final MiniDFSClusterService hdfs; + + private final MiniMRYarnCluster yarn; + + public ClusterBinding( + final MiniDFSClusterService hdfs, + final MiniMRYarnCluster yarn) { + this.hdfs = checkNotNull(hdfs); + this.yarn = checkNotNull(yarn); + } + + public MiniDFSClusterService getHdfs() { + return hdfs; + } + + public MiniMRYarnCluster getYarn() { + return yarn; + } + + public Configuration getConf() { + return getYarn().getConfig(); + } + + public void terminate() { + terminateService(getYarn()); + terminateService(getHdfs()); + } + } + + /** + * Create the cluster binding. This must be done in + * class setup of the (final) subclass. + * The HDFS and YARN clusters share the same configuration, so + * the HDFS cluster binding is implicitly propagated to YARN. + * @param conf configuration to start with. + * @return the cluster binding. + * @throws IOException failure. + */ + protected static ClusterBinding createCluster(JobConf conf) + throws IOException { + + conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); + conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); + + // create a unique cluster name. + String clusterName = "yarn-" + UUID.randomUUID(); + MiniDFSClusterService miniDFSClusterService = deployService(conf, + new MiniDFSClusterService()); + MiniMRYarnCluster yarnCluster = deployService(conf, + new MiniMRYarnCluster(clusterName, 2)); + return new ClusterBinding(miniDFSClusterService, yarnCluster); + } + + /** + * Get the cluster binding for this subclass + * @return + */ + protected abstract ClusterBinding getClusterBinding(); + + protected MiniDFSClusterService getHdfs() { + return getClusterBinding().getHdfs(); + } + + + protected MiniMRYarnCluster getYarn() { + return getClusterBinding().getYarn(); + } + + public FileSystem getLocalFS() { + return getHdfs().getLocalFS(); + } + + protected FileSystem getDFS() { + return getHdfs().getClusterFS(); + } + + /** + * The name of the committer as returned by + * {@link AbstractS3ACommitter#getName()} and used for committer construction. + */ + protected abstract String committerName(); + + @Override + public void setup() throws Exception { + super.setup(); + assertNotNull("cluster is not bound", + getClusterBinding()); + + scaleTest = getTestPropertyBool( + getConfiguration(), + KEY_SCALE_TESTS_ENABLED, + DEFAULT_SCALE_TESTS_ENABLED); + } + + @Override + protected int getTestTimeoutMillis() { + return SCALE_TEST_TIMEOUT_SECONDS * 1000; + } + + protected JobConf newJobConf() { + return new JobConf(getYarn().getConfig()); + } + + + protected Job createJob() throws IOException { + Job mrJob = Job.getInstance(getClusterBinding().getConf(), + getMethodName()); + patchConfigurationForCommitter(mrJob.getConfiguration()); + return mrJob; + } + + protected Configuration patchConfigurationForCommitter( + final Configuration jobConf) { + jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + uniqueFilenames); + bindCommitter(jobConf, + CommitConstants.S3A_COMMITTER_FACTORY, + committerName()); + // pass down the scale test flag + jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); + return jobConf; + } + + /** + * Get the file count for the test. + * @return the number of mappers to create. + */ + public int getTestFileCount() { + return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; + } + + /** + * Override point to let implementations tune the MR Job conf. + * @param jobConf configuration + */ + protected void applyCustomConfigOptions(JobConf jobConf) throws IOException { + + } + + /** + * Override point for any committer specific validation operations; + * called after the base assertions have all passed. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + protected void customPostExecutionValidation(Path destPath, + SuccessData successData) + throws Exception { + + } + + /** + * Assume that scale tests are enabled. + */ + protected void requireScaleTestsEnabled() { + assume("Scale test disabled: to enable set property " + + KEY_SCALE_TESTS_ENABLED, + isScaleTest()); + } + + public boolean isScaleTest() { + return scaleTest; + } + + public boolean isUniqueFilenames() { + return uniqueFilenames; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java similarity index 74% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java index b7be17ad5ea..a9b9c2cbe1e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java @@ -18,6 +18,11 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; @@ -33,7 +38,30 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are * passed down to these processes. */ -public class ITMagicCommitMRJob extends AbstractITCommitMRJob { +public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } /** * Need consistency here. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java similarity index 59% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java index c10ebed98d0..8d44ddba564 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java @@ -18,13 +18,41 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.mapred.JobConf; /** * Full integration test for the directory committer. */ -public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob { +public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } @Override protected String committerName() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java similarity index 59% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java index 1c19a952081..f71479c203b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java @@ -18,13 +18,42 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.mapred.JobConf; /** * Full integration test for the partition committer. */ -public class ITPartitionCommitMRJob extends AbstractITCommitMRJob { +public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } @Override protected String committerName() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java similarity index 63% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java index 76ad4645a20..d4a351f55c7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java @@ -18,25 +18,53 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; -import org.junit.Test; +import java.io.IOException; import org.hamcrest.core.StringContains; import org.hamcrest.core.StringEndsWith; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; /** * Full integration test for the staging committer. */ -public class ITStagingCommitMRJob extends AbstractITCommitMRJob { +public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } @Override protected String committerName() { @@ -51,12 +79,12 @@ public class ITStagingCommitMRJob extends AbstractITCommitMRJob { public void testStagingDirectory() throws Throwable { FileSystem hdfs = getDFS(); Configuration conf = hdfs.getConf(); - conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH, - "private"); + conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private"); Path dir = getMultipartUploadCommitsDirectory(conf, "UUID"); - assertThat(dir.toString(), StringEndsWith.endsWith( - "UUID/" - + StagingCommitterConstants.STAGING_UPLOADS)); + assertThat("Directory " + dir + " path is wrong", + dir.toString(), + StringEndsWith.endsWith("UUID/" + + STAGING_UPLOADS)); assertTrue("path unqualified", dir.isAbsolute()); String self = UserGroupInformation.getCurrentUser().getShortUserName(); assertThat(dir.toString(), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java similarity index 74% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java index be477a7de6e..68926f972ac 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java @@ -20,6 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; @@ -33,7 +36,30 @@ import org.apache.hadoop.test.LambdaTestUtils; * This is a test to verify that the committer will fail if the destination * directory exists, and that this happens in job setup. */ -public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob { +public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } @Override protected String committerName() { @@ -59,4 +85,5 @@ public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob { "Output directory", super::testMRJob); } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java new file mode 100644 index 00000000000..d286212aa16 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.terasort; + +import java.util.Optional; +import java.util.function.BiConsumer; + +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.terasort.TeraGen; +import org.apache.hadoop.examples.terasort.TeraSort; +import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; +import org.apache.hadoop.examples.terasort.TeraValidate; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static java.util.Optional.empty; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; + +/** + * Runs Terasort against S3A. + * + * This is all done on a shared mini YARN and HDFS clusters, set up before + * any of the tests methods run. + * + * The tests run in sequence, so each operation is isolated. + * This also means that the test paths deleted in test + * teardown; shared variables must all be static. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@SuppressWarnings("StaticNonFinalField") +public abstract class AbstractCommitTerasortIT extends + AbstractYarnClusterITest { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractCommitTerasortIT.class); + + // all the durations are optional as they only get filled in when + // a test run successfully completes. Failed tests don't have numbers. + private static Optional terasortDuration = empty(); + + private static Optional teragenStageDuration = empty(); + + private static Optional terasortStageDuration = empty(); + + private static Optional teravalidateStageDuration = empty(); + + private Path terasortPath; + + private Path sortInput; + + private Path sortOutput; + + private Path sortValidate; + + /** + * Not using special paths here. + * @return false + */ + @Override + public boolean useInconsistentClient() { + return false; + } + + @Override + public void setup() throws Exception { + super.setup(); + requireScaleTestsEnabled(); + prepareToTerasort(); + } + + /** + * Set up for terasorting by initializing paths. + * The paths used must be unique across parallel runs. + */ + private void prepareToTerasort() { + // small sample size for faster runs + Configuration yarnConfig = getYarn().getConfig(); + yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); + yarnConfig.setBoolean( + TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), + true); + terasortPath = new Path("/terasort-" + getClass().getSimpleName()) + .makeQualified(getFileSystem()); + sortInput = new Path(terasortPath, "sortin"); + sortOutput = new Path(terasortPath, "sortout"); + sortValidate = new Path(terasortPath, "validate"); + if (!terasortDuration.isPresent()) { + terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort")); + } + } + + /** + * Execute a single stage in the terasort, + * @param stage Stage name for messages/assertions. + * @param jobConf job conf + * @param dest destination directory -the _SUCCESS File will be expected here. + * @param tool tool to run. + * @param args args for the tool. + * @throws Exception any failure + */ + private Optional executeStage( + final String stage, + final JobConf jobConf, + final Path dest, + final Tool tool, + final String[] args) throws Exception { + int result; + DurationInfo d = new DurationInfo(LOG, stage); + try { + result = ToolRunner.run(jobConf, tool, args); + } finally { + d.close(); + } + assertEquals(stage + + "(" + StringUtils.join(", ", args) + ")" + + " failed", 0, result); + validateSuccessFile(getFileSystem(), dest, committerName()); + return Optional.of(d); + } + + /** + * Set up terasort by cleaning out the destination, and note the initial + * time before any of the jobs are executed. + */ + @Test + public void test_100_terasort_setup() throws Throwable { + describe("Setting up for a terasort"); + + getFileSystem().delete(terasortPath, true); + } + + @Test + public void test_110_teragen() throws Throwable { + describe("Teragen to %s", sortInput); + + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + teragenStageDuration = executeStage("Teragen", + jobConf, + sortInput, + new TeraGen(), + new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()}); + } + + @Test + public void test_120_terasort() throws Throwable { + describe("Terasort from %s to %s", sortInput, sortOutput); + loadSuccessFile(getFileSystem(), sortInput); + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + // this job adds some data, so skip it. + jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + terasortStageDuration = executeStage("TeraSort", + jobConf, + sortOutput, + new TeraSort(), + new String[]{sortInput.toString(), sortOutput.toString()}); + } + + @Test + public void test_130_teravalidate() throws Throwable { + describe("TeraValidate from %s to %s", sortOutput, sortValidate); + loadSuccessFile(getFileSystem(), sortOutput); + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + teravalidateStageDuration = executeStage("TeraValidate", + jobConf, + sortValidate, + new TeraValidate(), + new String[]{sortOutput.toString(), sortValidate.toString()}); + } + + /** + * Print the results, and save to the base dir as a CSV file. + * Why there? Makes it easy to list and compare. + */ + @Test + public void test_140_teracomplete() throws Throwable { + terasortDuration.get().close(); + + final StringBuilder results = new StringBuilder(); + results.append("\"Operation\"\t\"Duration\"\n"); + + // this is how you dynamically create a function in a method + // for use afterwards. + // Works because there's no IOEs being raised in this sequence. + BiConsumer> stage = + (s, od) -> + results.append(String.format("\"%s\"\t\"%s\"\n", + s, + od.map(DurationInfo::getDurationString).orElse(""))); + + stage.accept("Generate", teragenStageDuration); + stage.accept("Terasort", terasortStageDuration); + stage.accept("Validate", teravalidateStageDuration); + stage.accept("Completed", terasortDuration); + String text = results.toString(); + Path path = new Path(terasortPath, "results.csv"); + LOG.info("Results are in {}\n{}", path, text); + ContractTestUtils.writeTextFile(getFileSystem(), path, text, true); + } + + /** + * Reset the duration so if two committer tests are run sequentially. + * Without this the total execution time is reported as from the start of + * the first test suite to the end of the second. + */ + @Test + public void test_150_teracleanup() throws Throwable { + terasortDuration = Optional.empty(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java new file mode 100644 index 00000000000..cb9cdd0f334 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.terasort; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.mapred.JobConf; + +/** + * Terasort with the directory committer. + */ +public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } + + @Override + protected String committerName() { + return DirectoryStagingCommitter.NAME; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java new file mode 100644 index 00000000000..e1b4eac627a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.terasort; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.mapred.JobConf; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; + +/** + * Terasort with the magic committer. + */ +public final class ITestTerasortMagicCommitter + extends AbstractCommitTerasortIT { + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + @BeforeClass + public static void setupClusters() throws IOException { + clusterBinding = createCluster(new JobConf()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + clusterBinding.terminate(); + } + + @Override + public ClusterBinding getClusterBinding() { + return clusterBinding; + } + @Override + protected String committerName() { + return MagicS3GuardCommitter.NAME; + } + + /** + * Turn on the magic commit support for the FS, else nothing will work. + * @param conf configuration + */ + @Override + protected void applyCustomConfigOptions(JobConf conf) { + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + } + +}