HADOOP-16207 Improved S3A MR tests.

Contributed by Steve Loughran.

Replaces the committer-specific terasort and MR test jobs with parameterization
of the (now single tests) and use of file:// over hdfs:// as the cluster FS.

The parameterization ensures that only one of the specific committer tests
run at a time -overloads of the test machines are less likely, and so the
suites can be pulled back into the parallel phase.

There's also more detailed validation of the stage outputs of the terasorting;
if one test fails the rest are all skipped. This and the fact that job
output is stored under target/yarn-${timestamp} means failures should
be more debuggable.

Change-Id: Iefa370ba73c6419496e6e69dd6673d00f37ff095
This commit is contained in:
Steve Loughran 2019-10-04 14:11:22 +01:00
parent bca014b0e0
commit f44abc3e11
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
16 changed files with 987 additions and 904 deletions

View File

@ -188,8 +188,6 @@
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude> <exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
<!-- Terasort MR jobs spawn enough processes that they use up all RAM --> <!-- Terasort MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITestTerasort*.java</exclude> <exclude>**/ITestTerasort*.java</exclude>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITest*CommitMRJob.java</exclude>
<!-- operations across the metastore --> <!-- operations across the metastore -->
<exclude>**/ITestS3GuardDDBRootOperations.java</exclude> <exclude>**/ITestS3GuardDDBRootOperations.java</exclude>
</excludes> </excludes>
@ -231,8 +229,6 @@
<!-- the local FS. Running them sequentially guarantees isolation --> <!-- the local FS. Running them sequentially guarantees isolation -->
<!-- and that they don't conflict with the other MR jobs for RAM --> <!-- and that they don't conflict with the other MR jobs for RAM -->
<include>**/ITestTerasort*.java</include> <include>**/ITestTerasort*.java</include>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<include>**/ITest*CommitMRJob.java</include>
<!-- operations across the metastore --> <!-- operations across the metastore -->
<include>**/ITestS3AContractRootDir.java</include> <include>**/ITestS3AContractRootDir.java</include>
<include>**/ITestS3GuardDDBRootOperations.java</include> <include>**/ITestS3GuardDDBRootOperations.java</include>

View File

@ -677,7 +677,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
// we will try to abort the ones that had already succeeded. // we will try to abort the ones that had already succeeded.
int commitCount = taskOutput.size(); int commitCount = taskOutput.size();
final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>(); final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
LOG.info("{}: uploading from staging directory to S3", getRole()); LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
attemptPath);
LOG.info("{}: Saving pending data information to {}", LOG.info("{}: Saving pending data information to {}",
getRole(), commitsAttemptPath); getRole(), commitsAttemptPath);
if (taskOutput.isEmpty()) { if (taskOutput.isEmpty()) {

View File

@ -34,7 +34,7 @@ private StagingCommitterConstants() {
/** /**
* The temporary path for staging data, if not explicitly set. * The temporary path for staging data, if not explicitly set.
* By using an unqualified path, this will be qualified to be relative * By using an unqualified path, this will be qualified to be relative
* to the users' home directory, so protectec from access for others. * to the users' home directory, so protected from access for others.
*/ */
public static final String FILESYSTEM_TEMP_PATH = "tmp/staging"; public static final String FILESYSTEM_TEMP_PATH = "tmp/staging";

View File

@ -24,6 +24,7 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import org.assertj.core.api.Assertions;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -111,7 +112,9 @@ protected Configuration createConfiguration() {
MAGIC_COMMITTER_ENABLED, MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY, S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME, FS_S3A_COMMITTER_NAME,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
FAST_UPLOAD_BUFFER);
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
@ -209,6 +212,7 @@ public static String randomJobId() throws Exception {
*/ */
@Override @Override
public void teardown() throws Exception { public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
LOG.info("AbstractCommitITest::teardown"); LOG.info("AbstractCommitITest::teardown");
waitForConsistency(); waitForConsistency();
// make sure there are no failures any more // make sure there are no failures any more
@ -359,7 +363,7 @@ private String pathToPrefix(Path path) {
* @throws IOException IO Failure * @throws IOException IO Failure
*/ */
protected SuccessData verifySuccessMarker(Path dir) throws IOException { protected SuccessData verifySuccessMarker(Path dir) throws IOException {
return validateSuccessFile(dir, "", getFileSystem(), "query"); return validateSuccessFile(dir, "", getFileSystem(), "query", 0);
} }
/** /**
@ -437,13 +441,15 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
* @param committerName name of committer to match, or "" * @param committerName name of committer to match, or ""
* @param fs filesystem * @param fs filesystem
* @param origin origin (e.g. "teragen" for messages) * @param origin origin (e.g. "teragen" for messages)
* @param minimumFileCount minimum number of files to have been created
* @return the success data * @return the success data
* @throws IOException IO failure * @throws IOException IO failure
*/ */
public static SuccessData validateSuccessFile(final Path outputPath, public static SuccessData validateSuccessFile(final Path outputPath,
final String committerName, final String committerName,
final S3AFileSystem fs, final S3AFileSystem fs,
final String origin) throws IOException { final String origin,
final int minimumFileCount) throws IOException {
SuccessData successData = loadSuccessFile(fs, outputPath, origin); SuccessData successData = loadSuccessFile(fs, outputPath, origin);
String commitDetails = successData.toString(); String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}", LOG.info("Committer name " + committerName + "\n{}",
@ -456,6 +462,9 @@ public static SuccessData validateSuccessFile(final Path outputPath,
assertEquals("Wrong committer in " + commitDetails, assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter()); committerName, successData.getCommitter());
} }
Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSizeGreaterThanOrEqualTo(minimumFileCount);
return successData; return successData;
} }
@ -485,8 +494,9 @@ public static SuccessData loadSuccessFile(final S3AFileSystem fs,
status.isFile()); status.isFile());
assertTrue("0 byte success file " assertTrue("0 byte success file "
+ success + " from " + origin + success + " from " + origin
+ "; a s3guard committer was not used", + "; an S3A committer was not used",
status.getLen() > 0); status.getLen() > 0);
LOG.info("Loading committer success file {}", success);
return SuccessData.load(fs, success); return SuccessData.load(fs, success);
} }
} }

View File

@ -1,223 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
/**
* Test for an MR Job with all the different committers.
*/
public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractITCommitMRJob.class);
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
disableFilesystemCaching(conf);
return conf;
}
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
@Test
public void testMRJob() throws Exception {
describe("Run a simple MR Job");
S3AFileSystem fs = getFileSystem();
// final dest is in S3A
Path outputPath = path(getMethodName());
// create and delete to force in a tombstone marker -see HADOOP-16207
fs.mkdirs(outputPath);
fs.delete(outputPath, true);
String commitUUID = UUID.randomUUID().toString();
String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
int numFiles = getTestFileCount();
List<String> expectedFiles = new ArrayList<>(numFiles);
Set<String> expectedKeys = Sets.newHashSet();
for (int i = 0; i < numFiles; i += 1) {
File file = temp.newFile(i + ".text");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
}
String filename = String.format("part-m-%05d%s", i, suffix);
Path path = new Path(outputPath, filename);
expectedFiles.add(path.toString());
expectedKeys.add("/" + fs.pathToKey(path));
}
Collections.sort(expectedFiles);
Job mrJob = createJob();
JobConf jobConf = (JobConf) mrJob.getConfiguration();
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
FileOutputFormat.setOutputPath(mrJob, outputPath);
File mockResultsFile = temp.newFile("committer.bin");
mockResultsFile.delete();
String committerPath = "file:" + mockResultsFile;
jobConf.set("mock-results-file", committerPath);
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging");
mrJob.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI()));
mrJob.setMapperClass(MapClass.class);
mrJob.setNumReduceTasks(0);
// an attempt to set up log4j properly, which clearly doesn't work
URL log4j = getClass().getClassLoader().getResource("log4j.properties");
if (log4j != null && log4j.getProtocol().equals("file")) {
Path log4jPath = new Path(log4j.toURI());
LOG.debug("Using log4j path {}", log4jPath);
mrJob.addFileToClassPath(log4jPath);
String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s",
log4j);
jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops);
jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
}
applyCustomConfigOptions(jobConf);
// fail fast if anything goes wrong
mrJob.setMaxMapAttempts(1);
mrJob.submit();
try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
boolean succeeded = mrJob.waitForCompletion(true);
assertTrue("MR job failed", succeeded);
}
waitForConsistency();
verifyPathExists(fs,
"MR job Output directory not found,"
+ " even though the job did not report a failure",
outputPath);
assertIsDirectory(outputPath);
FileStatus[] results = fs.listStatus(outputPath,
S3AUtils.HIDDEN_FILE_FILTER);
int fileCount = results.length;
List<String> actualFiles = new ArrayList<>(fileCount);
assertTrue("No files in output directory", fileCount != 0);
LOG.info("Found {} files", fileCount);
for (FileStatus result : results) {
LOG.debug("result: {}", result);
actualFiles.add(result.getPath().toString());
}
Collections.sort(actualFiles);
SuccessData successData = validateSuccessFile(outputPath, committerName(),
fs, "MR job");
List<String> successFiles = successData.getFilenames();
String commitData = successData.toString();
assertFalse("No filenames in " + commitData,
successFiles.isEmpty());
Assertions.assertThat(actualFiles)
.describedAs("Committed files in the job output directory")
.containsExactlyInAnyOrderElementsOf(expectedFiles);
Assertions.assertThat(successFiles)
.describedAs("List of committed files in %s", commitData)
.containsExactlyInAnyOrderElementsOf(expectedKeys);
assertPathDoesNotExist("temporary dir",
new Path(outputPath, CommitConstants.TEMPORARY));
customPostExecutionValidation(outputPath, successData);
}
/**
* Test Mapper.
* This is executed in separate process, and must not make any assumptions
* about external state.
*/
public static class MapClass
extends Mapper<LongWritable, Text, LongWritable, Text> {
private int operations;
private String id = "";
private LongWritable l = new LongWritable();
private Text t = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
// force in Log4J logging
org.apache.log4j.BasicConfigurator.configure();
boolean scaleMap = context.getConfiguration()
.getBoolean(KEY_SCALE_TESTS_ENABLED, false);
operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
id = context.getTaskAttemptID().toString();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
for (int i = 0; i < operations; i++) {
l.set(i);
t.set(String.format("%s:%05d", id, i));
context.write(l, t);
}
}
}
}

View File

@ -19,8 +19,12 @@
package org.apache.hadoop.fs.s3a.commit; package org.apache.hadoop.fs.s3a.commit;
import java.io.IOException; import java.io.IOException;
import java.util.UUID; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,19 +37,22 @@
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService; import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService; import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
/** /**
* Full integration test MR jobs. * Full integration test MR jobs.
* *
* This is all done on shared static mini YARN and HDFS clusters, set up before * This is all done on shared static mini YARN and (optionally) HDFS clusters,
* any of the tests methods run. * set up before any of the tests methods run.
* *
* To isolate tests properly for parallel test runs, that static state * To isolate tests properly for parallel test runs, that static state
* needs to be stored in the final classes implementing the tests, and * needs to be stored in the final classes implementing the tests, and
@ -61,38 +68,54 @@
* If two subclasses of this class are instantiated in the same JVM, in order, * If two subclasses of this class are instantiated in the same JVM, in order,
* they are guaranteed to be isolated. * they are guaranteed to be isolated.
* *
* History: this is a superclass extracted from
* {@link AbstractITCommitMRJob} while adding support for testing terasorting.
*
*/ */
public abstract class AbstractYarnClusterITest extends AbstractCommitITest { public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractYarnClusterITest.class); LoggerFactory.getLogger(AbstractYarnClusterITest.class);
private static final int TEST_FILE_COUNT = 2; private static final int TEST_FILE_COUNT = 1;
private static final int SCALE_TEST_FILE_COUNT = 50; private static final int SCALE_TEST_FILE_COUNT = 10;
public static final int SCALE_TEST_KEYS = 1000; public static final int SCALE_TEST_KEYS = 100;
public static final int BASE_TEST_KEYS = 10; public static final int BASE_TEST_KEYS = 10;
public static final int NO_OF_NODEMANAGERS = 2;
private boolean scaleTest; private boolean scaleTest;
private boolean uniqueFilenames = false; /**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
clusterBinding = null;
}
/** /**
* This is the cluster binding which every subclass must create. * This is the cluster binding which every subclass must create.
*/ */
protected static final class ClusterBinding { protected static final class ClusterBinding {
private String clusterName;
private final MiniDFSClusterService hdfs; private final MiniDFSClusterService hdfs;
private final MiniMRYarnCluster yarn; private final MiniMRYarnCluster yarn;
public ClusterBinding( public ClusterBinding(
final String clusterName,
final MiniDFSClusterService hdfs, final MiniDFSClusterService hdfs,
final MiniMRYarnCluster yarn) { final MiniMRYarnCluster yarn) {
this.hdfs = checkNotNull(hdfs); this.clusterName = clusterName;
this.hdfs = hdfs;
this.yarn = checkNotNull(yarn); this.yarn = checkNotNull(yarn);
} }
@ -100,6 +123,18 @@ public MiniDFSClusterService getHdfs() {
return hdfs; return hdfs;
} }
/**
* Get the cluster FS, which will either be HDFS or the local FS.
* @return a filesystem.
* @throws IOException failure
*/
public FileSystem getClusterFS() throws IOException {
MiniDFSClusterService miniCluster = getHdfs();
return miniCluster != null
? miniCluster.getClusterFS()
: FileSystem.getLocal(yarn.getConfig());
}
public MiniMRYarnCluster getYarn() { public MiniMRYarnCluster getYarn() {
return yarn; return yarn;
} }
@ -108,6 +143,10 @@ public Configuration getConf() {
return getYarn().getConfig(); return getYarn().getConfig();
} }
public String getClusterName() {
return clusterName;
}
public void terminate() { public void terminate() {
terminateService(getYarn()); terminateService(getYarn());
terminateService(getHdfs()); terminateService(getHdfs());
@ -115,74 +154,111 @@ public void terminate() {
} }
/** /**
* Create the cluster binding. This must be done in * Create the cluster binding.
* class setup of the (final) subclass. * The configuration will be patched by propagating down options
* The HDFS and YARN clusters share the same configuration, so * from the maven build (S3Guard binding etc) and turning off unwanted
* YARN features.
*
* If an HDFS cluster is requested,
* the HDFS and YARN clusters will share the same configuration, so
* the HDFS cluster binding is implicitly propagated to YARN. * the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with. * @param conf configuration to start with.
* @param useHDFS should an HDFS cluster be instantiated.
* @return the cluster binding. * @return the cluster binding.
* @throws IOException failure. * @throws IOException failure.
*/ */
protected static ClusterBinding createCluster(JobConf conf) protected static ClusterBinding createCluster(
throws IOException { final JobConf conf,
final boolean useHDFS) throws IOException {
prepareTestConfiguration(conf);
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
// minicluster tests overreact to not enough disk space.
// create a unique cluster name. conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
String clusterName = "yarn-" + UUID.randomUUID(); conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
MiniDFSClusterService miniDFSClusterService = deployService(conf, // create a unique cluster name based on the current time in millis.
new MiniDFSClusterService()); String timestamp = LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
String clusterName = "yarn-" + timestamp;
MiniDFSClusterService miniDFSClusterService =
useHDFS
? deployService(conf, new MiniDFSClusterService())
: null;
MiniMRYarnCluster yarnCluster = deployService(conf, MiniMRYarnCluster yarnCluster = deployService(conf,
new MiniMRYarnCluster(clusterName, 2)); new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS));
return new ClusterBinding(miniDFSClusterService, yarnCluster); return new ClusterBinding(clusterName, miniDFSClusterService, yarnCluster);
} }
protected static void terminateCluster(ClusterBinding clusterBinding) { /**
if (clusterBinding != null) { * Terminate the cluster if it is not null.
clusterBinding.terminate(); * @param cluster the cluster
*/
protected static void terminateCluster(ClusterBinding cluster) {
if (cluster != null) {
cluster.terminate();
} }
} }
/** /**
* Get the cluster binding for this subclass * Get the cluster binding for this subclass.
* @return * @return the cluster binding
*/ */
protected abstract ClusterBinding getClusterBinding(); protected ClusterBinding getClusterBinding() {
return clusterBinding;
protected MiniDFSClusterService getHdfs() {
return getClusterBinding().getHdfs();
} }
protected MiniMRYarnCluster getYarn() { protected MiniMRYarnCluster getYarn() {
return getClusterBinding().getYarn(); return getClusterBinding().getYarn();
} }
public FileSystem getLocalFS() { /**
return getHdfs().getLocalFS(); * Get the cluster filesystem -hdfs or local.
* @return the filesystem shared across the yarn nodes.
*/
protected FileSystem getClusterFS() throws IOException {
return getClusterBinding().getClusterFS();
} }
protected FileSystem getDFS() {
return getHdfs().getClusterFS(); /**
} * We stage work into a temporary directory rather than directly under
* the user's home directory, as that is often rejected by CI test
* runners.
*/
@Rule
public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
/** /**
* The name of the committer as returned by * The name of the committer as returned by
* {@link AbstractS3ACommitter#getName()} and used for committer construction. * {@link AbstractS3ACommitter#getName()}
* and used for committer construction.
*/ */
protected abstract String committerName(); protected abstract String committerName();
/**
* binding on demand rather than in a BeforeClass static method.
* Subclasses can override this to change the binding options.
* @return the cluster binding
*/
protected ClusterBinding demandCreateClusterBinding() throws Exception {
return createCluster(new JobConf(), false);
}
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
assertNotNull("cluster is not bound",
getClusterBinding());
scaleTest = getTestPropertyBool( scaleTest = getTestPropertyBool(
getConfiguration(), getConfiguration(),
KEY_SCALE_TESTS_ENABLED, KEY_SCALE_TESTS_ENABLED,
DEFAULT_SCALE_TESTS_ENABLED); DEFAULT_SCALE_TESTS_ENABLED);
if (getClusterBinding() == null) {
clusterBinding = demandCreateClusterBinding();
}
assertNotNull("cluster is not bound",
getClusterBinding());
} }
@Override @Override
@ -190,28 +266,46 @@ protected int getTestTimeoutMillis() {
return SCALE_TEST_TIMEOUT_SECONDS * 1000; return SCALE_TEST_TIMEOUT_SECONDS * 1000;
} }
protected JobConf newJobConf() { /**
return new JobConf(getYarn().getConfig()); * Create a job configuration.
* This creates a new job conf from the yarn
* cluster configuration then calls
* {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
* @return the new job configuration.
* @throws IOException failure
*/
protected JobConf newJobConf() throws IOException {
JobConf jobConf = new JobConf(getYarn().getConfig());
jobConf.addResource(getConfiguration());
applyCustomConfigOptions(jobConf);
return jobConf;
} }
protected Job createJob() throws IOException { protected Job createJob(Configuration jobConf) throws IOException {
Configuration jobConf = getClusterBinding().getConf();
jobConf.addResource(getConfiguration());
Job mrJob = Job.getInstance(jobConf, getMethodName()); Job mrJob = Job.getInstance(jobConf, getMethodName());
patchConfigurationForCommitter(mrJob.getConfiguration()); patchConfigurationForCommitter(mrJob.getConfiguration());
return mrJob; return mrJob;
} }
/**
* Patch the (job) configuration for this committer.
* @param jobConf configuration to patch
* @return a configuration which will run this configuration.
*/
protected Configuration patchConfigurationForCommitter( protected Configuration patchConfigurationForCommitter(
final Configuration jobConf) { final Configuration jobConf) {
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
uniqueFilenames); isUniqueFilenames());
bindCommitter(jobConf, bindCommitter(jobConf,
CommitConstants.S3A_COMMITTER_FACTORY, CommitConstants.S3A_COMMITTER_FACTORY,
committerName()); committerName());
// pass down the scale test flag // pass down the scale test flag
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, isScaleTest());
// and fix the commit dir to the local FS across all workers.
String staging = stagingFilesDir.getRoot().getAbsolutePath();
LOG.info("Staging temp dir is {}", staging);
jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, staging);
return jobConf; return jobConf;
} }
@ -220,7 +314,7 @@ protected Configuration patchConfigurationForCommitter(
* @return the number of mappers to create. * @return the number of mappers to create.
*/ */
public int getTestFileCount() { public int getTestFileCount() {
return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; return isScaleTest() ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
} }
/** /**
@ -258,6 +352,6 @@ public boolean isScaleTest() {
} }
public boolean isUniqueFilenames() { public boolean isUniqueFilenames() {
return uniqueFilenames; return false;
} }
} }

View File

@ -0,0 +1,644 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.integration;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test an MR Job with all the different committers.
* <p>
* This is a fairly complex parameterization: it is designed to
* avoid the overhead of starting a Yarn cluster for
* individual committer types, so speed up operations.
* <p>
* It also implicitly guarantees that there is never more than one of these
* MR jobs active at a time, so avoids overloading the test machine with too
* many processes.
* How the binding works:
* <ol>
* <li>
* Each parameterized suite is configured through its own
* {@link CommitterTestBinding} subclass.
* </li>
* <li>
* JUnit runs these test suites one parameterized binding at a time.
* </li>
* <li>
* The test suites are declared to be executed in ascending order, so
* that for a specific binding, the order is {@link #test_000()},
* {@link #test_100()} {@link #test_200_execute()} and finally
* {@link #test_500()}.
* </li>
* <li>
* {@link #test_000()} calls {@link CommitterTestBinding#validate()} to
* as to validate the state of the committer. This is primarily to
* verify that the binding setup mechanism is working.
* </li>
* <li>
* {@link #test_100()} is relayed to
* {@link CommitterTestBinding#test_100()},
* for any preflight tests.
* </li>
* <li>
* The {@link #test_200_execute()} test runs the MR job for that
* particular binding with standard reporting and verification of the
* outcome.
* </li>
* <li>
* {@link #test_500()} test is relayed to
* {@link CommitterTestBinding#test_500()}, for any post-MR-job tests.
* </ol>
*
* A new S3A FileSystem instance is created for each test_ method, so the
* pre-execute and post-execute validators cannot inspect the state of the
* FS as part of their tests.
* However, as the MR workers and AM all run in their own processes, there's
* generally no useful information about the job in the local S3AFileSystem
* instance.
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3ACommitterMRJob.class);
/**
* Test array for parameterized test runs.
*
* @return the committer binding for this run.
*/
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{new DirectoryCommitterTestBinding()},
{new PartitionCommitterTestBinding()},
{new MagicCommitterTestBinding()},
});
}
/**
* The committer binding for this instance.
*/
private final CommitterTestBinding committerTestBinding;
/**
* Parameterized constructor.
* @param committerTestBinding binding for the test.
*/
public ITestS3ACommitterMRJob(
final CommitterTestBinding committerTestBinding) {
this.committerTestBinding = committerTestBinding;
}
@Override
public void setup() throws Exception {
super.setup();
// configure the test binding for this specific test case.
committerTestBinding.setup(getClusterBinding(), getFileSystem());
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
disableFilesystemCaching(conf);
return conf;
}
@Rule
public final TemporaryFolder localFilesDir = new TemporaryFolder();
@Override
protected String committerName() {
return committerTestBinding.getCommitterName();
}
@Override
public boolean useInconsistentClient() {
return committerTestBinding.useInconsistentClient();
}
/**
* Verify that the committer binding is happy.
*/
@Test
public void test_000() throws Throwable {
committerTestBinding.validate();
}
@Test
public void test_100() throws Throwable {
committerTestBinding.test_100();
}
@Test
public void test_200_execute() throws Exception {
describe("Run an MR with committer %s", committerName());
S3AFileSystem fs = getFileSystem();
// final dest is in S3A
// we can't use the method name as the template places square braces into
// that and URI creation fails.
Path outputPath = path("ITestS3ACommitterMRJob-execute-"+ committerName());
// create and delete to force in a tombstone marker -see HADOOP-16207
fs.mkdirs(outputPath);
fs.delete(outputPath, true);
String commitUUID = UUID.randomUUID().toString();
String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
int numFiles = getTestFileCount();
// create all the input files on the local FS.
List<String> expectedFiles = new ArrayList<>(numFiles);
Set<String> expectedKeys = Sets.newHashSet();
for (int i = 0; i < numFiles; i += 1) {
File file = localFilesDir.newFile(i + ".text");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
}
String filename = String.format("part-m-%05d%s", i, suffix);
Path path = new Path(outputPath, filename);
expectedFiles.add(path.toString());
expectedKeys.add("/" + fs.pathToKey(path));
}
Collections.sort(expectedFiles);
Job mrJob = createJob(newJobConf());
JobConf jobConf = (JobConf) mrJob.getConfiguration();
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
FileOutputFormat.setOutputPath(mrJob, outputPath);
File mockResultsFile = localFilesDir.newFile("committer.bin");
mockResultsFile.delete();
String committerPath = "file:" + mockResultsFile;
jobConf.set("mock-results-file", committerPath);
// setting up staging options is harmless for other committers
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
mrJob.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(mrJob,
new Path(localFilesDir.getRoot().toURI()));
mrJob.setMapperClass(MapClass.class);
mrJob.setNumReduceTasks(0);
// an attempt to set up log4j properly, which clearly doesn't work
URL log4j = getClass().getClassLoader().getResource("log4j.properties");
if (log4j != null && "file".equals(log4j.getProtocol())) {
Path log4jPath = new Path(log4j.toURI());
LOG.debug("Using log4j path {}", log4jPath);
mrJob.addFileToClassPath(log4jPath);
String sysprops = String.format("-Xmx128m -Dlog4j.configuration=%s",
log4j);
jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops);
jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
}
applyCustomConfigOptions(jobConf);
// fail fast if anything goes wrong
mrJob.setMaxMapAttempts(1);
try (DurationInfo ignore = new DurationInfo(LOG, "Job Submit")) {
mrJob.submit();
}
String jobID = mrJob.getJobID().toString();
String logLocation = "logs under "
+ getYarn().getTestWorkDir().getAbsolutePath();
try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
mrJob.waitForCompletion(true);
}
JobStatus status = mrJob.getStatus();
if (!mrJob.isSuccessful()) {
// failure of job.
// be as meaningful as possible.
String message =
String.format("Job %s failed in state %s with cause %s.\n"
+ "Consult %s",
jobID,
status.getState(),
status.getFailureInfo(),
logLocation);
LOG.error(message);
fail(message);
}
waitForConsistency();
Path successPath = new Path(outputPath, _SUCCESS);
SuccessData successData = validateSuccessFile(outputPath,
committerName(),
fs,
"MR job " + jobID,
1);
String commitData = successData.toString();
FileStatus[] results = fs.listStatus(outputPath,
S3AUtils.HIDDEN_FILE_FILTER);
int fileCount = results.length;
Assertions.assertThat(fileCount)
.describedAs("No files from job %s in output directory %s; see %s",
jobID,
outputPath,
logLocation)
.isNotEqualTo(0);
List<String> actualFiles = Arrays.stream(results)
.map(s -> s.getPath().toString())
.sorted()
.collect(Collectors.toList());
Assertions.assertThat(actualFiles)
.describedAs("Files found in %s", outputPath)
.isEqualTo(expectedFiles);
Assertions.assertThat(successData.getFilenames())
.describedAs("Success files listed in %s:%s",
successPath, commitData)
.isNotEmpty()
.containsExactlyInAnyOrderElementsOf(expectedKeys);
assertPathDoesNotExist("temporary dir should only be from"
+ " classic file committers",
new Path(outputPath, CommitConstants.TEMPORARY));
customPostExecutionValidation(outputPath, successData);
}
@Override
protected void applyCustomConfigOptions(final JobConf jobConf)
throws IOException {
committerTestBinding.applyCustomConfigOptions(jobConf);
}
@Override
protected void customPostExecutionValidation(final Path destPath,
final SuccessData successData) throws Exception {
committerTestBinding.validateResult(destPath, successData);
}
/**
* This is the extra test which committer test bindings can add.
*/
@Test
public void test_500() throws Throwable {
committerTestBinding.test_500();
}
/**
* Test Mapper.
* This is executed in separate process, and must not make any assumptions
* about external state.
*/
public static class MapClass
extends Mapper<LongWritable, Text, LongWritable, Text> {
private int operations;
private String id = "";
private LongWritable l = new LongWritable();
private Text t = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
// force in Log4J logging
org.apache.log4j.BasicConfigurator.configure();
// and pick up scale test flag as passed down
boolean scaleMap = context.getConfiguration()
.getBoolean(KEY_SCALE_TESTS_ENABLED, false);
operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
id = context.getTaskAttemptID().toString();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
for (int i = 0; i < operations; i++) {
l.set(i);
t.set(String.format("%s:%05d", id, i));
context.write(l, t);
}
}
}
/**
* A binding class for committer tests.
* Subclasses of this will be instantiated and drive the parameterized
* test suite.
*
* These classes will be instantiated in a static array of the suite, and
* not bound to a cluster binding or filesystem.
*
* The per-method test {@link #setup()} method will call
* {@link #setup(ClusterBinding, S3AFileSystem)}, to link the instance
* to the specific test cluster <i>and test filesystem</i> in use
* in that test.
*/
private abstract static class CommitterTestBinding {
/**
* Name.
*/
private final String committerName;
/**
* Cluster binding.
*/
private ClusterBinding binding;
/**
* The S3A filesystem.
*/
private S3AFileSystem remoteFS;
/**
* Constructor.
* @param committerName name of the committer for messages.
*/
protected CommitterTestBinding(final String committerName) {
this.committerName = committerName;
}
/**
* Set up the test binding: this is called during test setup.
* @param cluster the active test cluster.
* @param fs the S3A Filesystem used as a destination.
*/
private void setup(
ClusterBinding cluster,
S3AFileSystem fs) {
this.binding = cluster;
this.remoteFS = fs;
}
protected String getCommitterName() {
return committerName;
}
protected ClusterBinding getBinding() {
return binding;
}
protected S3AFileSystem getRemoteFS() {
return remoteFS;
}
protected FileSystem getClusterFS() throws IOException {
return getBinding().getClusterFS();
}
@Override
public String toString() {
return committerName;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf)
throws IOException {
}
/**
* Should the inconsistent S3A client be used?
* @return true for inconsistent listing
*/
public abstract boolean useInconsistentClient();
/**
* Override point for any committer specific validation operations;
* called after the base assertions have all passed.
* @param destPath destination of work
* @param successData loaded success data
* @throws Exception failure
*/
protected void validateResult(Path destPath,
SuccessData successData)
throws Exception {
}
/**
* A test to run before the main {@link #test_200_execute()} test is
* invoked.
* @throws Throwable failure.
*/
void test_100() throws Throwable {
}
/**
* A test to run after the main {@link #test_200_execute()} test is
* invoked.
* @throws Throwable failure.
*/
void test_500() throws Throwable {
}
/**
* Validate the state of the binding.
* This is called in {@link #test_000()} so will
* fail independently of the other tests.
* @throws Throwable failure.
*/
public void validate() throws Throwable {
assertNotNull("Not bound to a cluster", binding);
assertNotNull("No cluster filesystem", getClusterFS());
assertNotNull("No yarn cluster", binding.getYarn());
}
}
/**
* The directory staging committer.
*/
private static final class DirectoryCommitterTestBinding
extends CommitterTestBinding {
private DirectoryCommitterTestBinding() {
super(DirectoryStagingCommitter.NAME);
}
/**
* @return true for inconsistent listing
*/
public boolean useInconsistentClient() {
return true;
}
/**
* Verify that staging commit dirs are made absolute under the user's
* home directory, so, in a secure cluster, private.
*/
@Override
void test_100() throws Throwable {
FileSystem fs = getClusterFS();
Configuration conf = fs.getConf();
String pri = "private";
conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, pri);
Path dir = getMultipartUploadCommitsDirectory(conf, "uuid");
Assertions.assertThat(dir.isAbsolute())
.describedAs("non-absolute path")
.isTrue();
String stagingTempDir = dir.toString().toLowerCase(Locale.ENGLISH);
String self = UserGroupInformation.getCurrentUser()
.getShortUserName().toLowerCase(Locale.ENGLISH);
Assertions.assertThat(stagingTempDir)
.describedAs("Staging committer temp path in cluster")
.contains(pri + "/" + self)
.endsWith("uuid/" + STAGING_UPLOADS);
}
}
/**
* The partition committer test binding.
*/
private static final class PartitionCommitterTestBinding
extends CommitterTestBinding {
private PartitionCommitterTestBinding() {
super(PartitionedStagingCommitter.NAME);
}
/**
* @return true for inconsistent listing
*/
public boolean useInconsistentClient() {
return true;
}
}
/**
* The magic committer test binding.
* This includes extra result validation.
*/
private static final class MagicCommitterTestBinding
extends CommitterTestBinding {
private MagicCommitterTestBinding() {
super(MagicS3GuardCommitter.NAME);
}
/**
* @return we need a consistent store.
*/
public boolean useInconsistentClient() {
return false;
}
/**
* The result validation here is that there isn't a __magic directory
* any more.
* @param destPath destination of work
* @param successData loaded success data
* @throws Exception failure
*/
@Override
protected void validateResult(final Path destPath,
final SuccessData successData)
throws Exception {
Path magicDir = new Path(destPath, MAGIC);
// if an FNFE isn't raised on getFileStatus, list out the directory
// tree
S3AFileSystem fs = getRemoteFS();
// log the contents
lsR(fs, destPath, true);
intercept(FileNotFoundException.class, () -> {
final FileStatus st = fs.getFileStatus(magicDir);
StringBuilder result = new StringBuilder("Found magic dir which should"
+ " have been deleted at ").append(st).append('\n');
result.append("[");
applyLocatedFiles(fs.listFiles(magicDir, true),
(status) -> result.append(status.getPath()).append('\n'));
result.append("[");
return result.toString();
});
}
}
}

View File

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Full integration test for the Magic Committer.
*
* There's no need to disable the committer setting for the filesystem here,
* because the committers are being instantiated in their own processes;
* the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes.
*/
public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
/**
* Need consistency here.
* @return false
*/
@Override
public boolean useInconsistentClient() {
return false;
}
@Override
protected String committerName() {
return MagicS3GuardCommitter.NAME;
}
/**
* Turn on the magic commit support for the FS, else nothing will work.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}
/**
* Check that the magic dir was cleaned up.
* {@inheritDoc}
*/
@Override
protected void customPostExecutionValidation(Path destPath,
SuccessData successData) throws Exception {
Path magicDir = new Path(destPath, MAGIC);
// if an FNFE isn't raised on getFileStatus, list out the directory
// tree
S3AFileSystem fs = getFileSystem();
// log the contents
lsR(fs, destPath, true);
intercept(FileNotFoundException.class, () -> {
final FileStatus st = fs.getFileStatus(magicDir);
StringBuilder result = new StringBuilder("Found magic dir which should"
+ " have been deleted at ").append(st).append('\n');
result.append("[");
applyLocatedFiles(fs.listFiles(magicDir, true),
(status) -> result.append(status.getPath()).append('\n'));
result.append("[");
return result.toString();
});
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the directory committer.
*/
public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf()); }
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return DirectoryStagingCommitter.NAME;
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the partition committer.
*/
public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return PartitionedStagingCommitter.NAME;
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.hamcrest.core.StringContains;
import org.hamcrest.core.StringEndsWith;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
/**
* Full integration test for the staging committer.
*/
public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return StagingCommitter.NAME;
}
/**
* Verify that staging commit dirs are made absolute under the user's
* home directory, so, in a secure cluster, private.
*/
@Test
public void testStagingDirectory() throws Throwable {
FileSystem hdfs = getDFS();
Configuration conf = hdfs.getConf();
conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
assertThat("Directory " + dir + " path is wrong",
dir.toString(),
StringEndsWith.endsWith("UUID/"
+ STAGING_UPLOADS));
assertTrue("path unqualified", dir.isAbsolute());
String self = UserGroupInformation.getCurrentUser().getShortUserName();
assertThat(dir.toString(),
StringContains.containsString("/user/" + self + "/private"));
}
}

View File

@ -1,89 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.test.LambdaTestUtils;
/**
* This is a test to verify that the committer will fail if the destination
* directory exists, and that this happens in job setup.
*/
public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return StagingCommitter.NAME;
}
/**
* create the destination directory and expect a failure.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) throws IOException {
// This is the destination in the S3 FS
String outdir = conf.get(FileOutputFormat.OUTDIR);
S3AFileSystem fs = getFileSystem();
Path outputPath = new Path(outdir);
fs.mkdirs(outputPath);
}
@Override
public void testMRJob() throws Exception {
LambdaTestUtils.intercept(FileAlreadyExistsException.class,
"Output directory",
super::testMRJob);
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.terasort;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Terasort with the directory committer.
*/
public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return DirectoryStagingCommitter.NAME;
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.terasort;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
/**
* Terasort with the magic committer.
*/
public final class ITestTerasortMagicCommitter
extends AbstractCommitTerasortIT {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return MagicS3GuardCommitter.NAME;
}
/**
* Turn on the magic commit support for the FS, else nothing will work.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}
}

View File

@ -19,24 +19,33 @@
package org.apache.hadoop.fs.s3a.commit.terasort; package org.apache.hadoop.fs.s3a.commit.terasort;
import java.io.File; import java.io.File;
import java.nio.charset.Charset; import java.io.FileNotFoundException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiConsumer; import java.util.function.Consumer;
import org.junit.Assume;
import org.junit.FixMethodOrder; import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.terasort.TeraGen; import org.apache.hadoop.examples.terasort.TeraGen;
import org.apache.hadoop.examples.terasort.TeraSort; import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -44,45 +53,79 @@
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import static java.util.Optional.empty; import static java.util.Optional.empty;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
/** /**
* Runs Terasort against S3A. * Runs Terasort against S3A.
* *
* This is all done on a shared mini YARN and HDFS clusters, set up before * Parameterized by committer name, using a YARN cluster
* any of the tests methods run. * shared across all test runs.
*
* The tests run in sequence, so each operation is isolated. * The tests run in sequence, so each operation is isolated.
* This also means that the test paths deleted in test * This also means that the test paths are deleted in test
* teardown; shared variables must all be static. * teardown; shared variables must all be static.
*
* The test is a scale test; for each parameter it takes a few minutes to
* run the full suite.
* Before anyone calls that out as slow: try running the test with the file
* committer.
*/ */
@FixMethodOrder(MethodSorters.NAME_ASCENDING) @FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
@SuppressWarnings("StaticNonFinalField") @SuppressWarnings("StaticNonFinalField")
public abstract class AbstractCommitTerasortIT extends public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
AbstractYarnClusterITest {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommitTerasortIT.class); LoggerFactory.getLogger(ITestTerasortOnS3A.class);
// all the durations are optional as they only get filled in when public static final int EXPECTED_PARTITION_COUNT = 10;
// a test run successfully completes. Failed tests don't have numbers.
public static final int PARTITION_SAMPLE_SIZE = 1000;
public static final int ROW_COUNT = 1000;
/**
* Duration tracker created in the first of the test cases and closed
* in {@link #test_140_teracomplete()}.
*/
private static Optional<DurationInfo> terasortDuration = empty(); private static Optional<DurationInfo> terasortDuration = empty();
private static Optional<DurationInfo> teragenStageDuration = empty(); /**
* Tracker of which stages are completed and how long they took.
*/
private static Map<String, DurationInfo> completedStages = new HashMap<>();
private static Optional<DurationInfo> terasortStageDuration = empty(); /** Name of the committer for this run. */
private final String committerName;
private static Optional<DurationInfo> teravalidateStageDuration = empty();
/** Base path for all the terasort input and output paths. */
private Path terasortPath; private Path terasortPath;
/** Input (teragen) path. */
private Path sortInput; private Path sortInput;
/** Path where sorted data goes. */
private Path sortOutput; private Path sortOutput;
/** Path for validated job's output. */
private Path sortValidate; private Path sortValidate;
/**
* Test array for parameterized test runs.
*
* @return the committer binding for this run.
*/
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{DirectoryStagingCommitter.NAME},
{MagicS3GuardCommitter.NAME}});
}
public ITestTerasortOnS3A(final String committerName) {
this.committerName = committerName;
}
/** /**
* Not using special paths here. * Not using special paths here.
* @return false * @return false
@ -92,6 +135,11 @@ public boolean useInconsistentClient() {
return false; return false;
} }
@Override
protected String committerName() {
return committerName;
}
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
@ -100,44 +148,88 @@ public void setup() throws Exception {
} }
/** /**
* Set up for terasorting by initializing paths. * Set up the job conf with the options for terasort chosen by the scale
* The paths used must be unique across parallel runs. * options.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) {
// small sample size for faster runs
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
getSampleSizeForEachPartition());
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
getExpectedPartitionCount());
conf.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
false);
}
private int getExpectedPartitionCount() {
return EXPECTED_PARTITION_COUNT;
}
private int getSampleSizeForEachPartition() {
return PARTITION_SAMPLE_SIZE;
}
protected int getRowCount() {
return ROW_COUNT;
}
/**
* Set up the terasort by initializing paths variables
* The paths used must be unique across parameterized runs but
* common across all test cases in a single parameterized run.
*/ */
private void prepareToTerasort() { private void prepareToTerasort() {
// small sample size for faster runs // small sample size for faster runs
Configuration yarnConfig = getYarn().getConfig(); terasortPath = new Path("/terasort-" + committerName)
yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
yarnConfig.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
true);
yarnConfig.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
false);
terasortPath = new Path("/terasort-" + getClass().getSimpleName())
.makeQualified(getFileSystem()); .makeQualified(getFileSystem());
sortInput = new Path(terasortPath, "sortin"); sortInput = new Path(terasortPath, "sortin");
sortOutput = new Path(terasortPath, "sortout"); sortOutput = new Path(terasortPath, "sortout");
sortValidate = new Path(terasortPath, "validate"); sortValidate = new Path(terasortPath, "validate");
if (!terasortDuration.isPresent()) {
terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
}
} }
/** /**
* Execute a single stage in the terasort, * Declare that a stage has completed.
* @param stage Stage name for messages/assertions. * @param stage stage name/key in the map
* @param d duration.
*/
private static void completedStage(final String stage,
final DurationInfo d) {
completedStages.put(stage, d);
}
/**
* Declare a stage which is required for this test case.
* @param stage stage name
*/
private static void requireStage(final String stage) {
Assume.assumeTrue(
"Required stage was not completed: " + stage,
completedStages.get(stage) != null);
}
/**
* Execute a single stage in the terasort.
* Updates the completed stages map with the stage duration -if successful.
* @param stage Stage name for the stages map.
* @param jobConf job conf * @param jobConf job conf
* @param dest destination directory -the _SUCCESS File will be expected here. * @param dest destination directory -the _SUCCESS file will be expected here.
* @param tool tool to run. * @param tool tool to run.
* @param args args for the tool. * @param args args for the tool.
* @param minimumFileCount minimum number of files to have been created
* @throws Exception any failure * @throws Exception any failure
*/ */
private Optional<DurationInfo> executeStage( private void executeStage(
final String stage, final String stage,
final JobConf jobConf, final JobConf jobConf,
final Path dest, final Path dest,
final Tool tool, final Tool tool,
final String[] args) throws Exception { final String[] args,
final int minimumFileCount) throws Exception {
int result; int result;
DurationInfo d = new DurationInfo(LOG, stage); DurationInfo d = new DurationInfo(LOG, stage);
try { try {
@ -145,22 +237,30 @@ private Optional<DurationInfo> executeStage(
} finally { } finally {
d.close(); d.close();
} }
dumpOutputTree(dest);
assertEquals(stage assertEquals(stage
+ "(" + StringUtils.join(", ", args) + ")" + "(" + StringUtils.join(", ", args) + ")"
+ " failed", 0, result); + " failed", 0, result);
validateSuccessFile(dest, committerName(), getFileSystem(), stage); validateSuccessFile(dest, committerName(), getFileSystem(), stage,
return Optional.of(d); minimumFileCount);
completedStage(stage, d);
} }
/** /**
* Set up terasort by cleaning out the destination, and note the initial * Set up terasort by cleaning out the destination, and note the initial
* time before any of the jobs are executed. * time before any of the jobs are executed.
*
* This is executed first <i>for each parameterized run</i>.
* It is where all variables which need to be reset for each run need
* to be reset.
*/ */
@Test @Test
public void test_100_terasort_setup() throws Throwable { public void test_100_terasort_setup() throws Throwable {
describe("Setting up for a terasort"); describe("Setting up for a terasort");
getFileSystem().delete(terasortPath, true); getFileSystem().delete(terasortPath, true);
completedStages = new HashMap<>();
terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
} }
@Test @Test
@ -170,42 +270,46 @@ public void test_110_teragen() throws Throwable {
JobConf jobConf = newJobConf(); JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf); patchConfigurationForCommitter(jobConf);
teragenStageDuration = executeStage("Teragen", executeStage("teragen",
jobConf, jobConf,
sortInput, sortInput,
new TeraGen(), new TeraGen(),
new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()}); new String[]{Integer.toString(getRowCount()), sortInput.toString()},
1);
} }
@Test @Test
public void test_120_terasort() throws Throwable { public void test_120_terasort() throws Throwable {
describe("Terasort from %s to %s", sortInput, sortOutput); describe("Terasort from %s to %s", sortInput, sortOutput);
requireStage("teragen");
getFileSystem().delete(sortOutput, true); getFileSystem().delete(sortOutput, true);
loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage"); loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage");
JobConf jobConf = newJobConf(); JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf); patchConfigurationForCommitter(jobConf);
// this job adds some data, so skip it. executeStage("terasort",
jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
terasortStageDuration = executeStage("TeraSort",
jobConf, jobConf,
sortOutput, sortOutput,
new TeraSort(), new TeraSort(),
new String[]{sortInput.toString(), sortOutput.toString()}); new String[]{sortInput.toString(), sortOutput.toString()},
1);
} }
@Test @Test
public void test_130_teravalidate() throws Throwable { public void test_130_teravalidate() throws Throwable {
describe("TeraValidate from %s to %s", sortOutput, sortValidate); describe("TeraValidate from %s to %s", sortOutput, sortValidate);
requireStage("terasort");
getFileSystem().delete(sortValidate, true); getFileSystem().delete(sortValidate, true);
loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage"); loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage");
JobConf jobConf = newJobConf(); JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf); patchConfigurationForCommitter(jobConf);
teravalidateStageDuration = executeStage("TeraValidate", executeStage("teravalidate",
jobConf, jobConf,
sortValidate, sortValidate,
new TeraValidate(), new TeraValidate(),
new String[]{sortOutput.toString(), sortValidate.toString()}); new String[]{sortOutput.toString(), sortValidate.toString()},
1);
} }
/** /**
@ -214,7 +318,10 @@ public void test_130_teravalidate() throws Throwable {
*/ */
@Test @Test
public void test_140_teracomplete() throws Throwable { public void test_140_teracomplete() throws Throwable {
terasortDuration.get().close(); terasortDuration.ifPresent(d -> {
d.close();
completedStage("overall", d);
});
final StringBuilder results = new StringBuilder(); final StringBuilder results = new StringBuilder();
results.append("\"Operation\"\t\"Duration\"\n"); results.append("\"Operation\"\t\"Duration\"\n");
@ -222,19 +329,20 @@ public void test_140_teracomplete() throws Throwable {
// this is how you dynamically create a function in a method // this is how you dynamically create a function in a method
// for use afterwards. // for use afterwards.
// Works because there's no IOEs being raised in this sequence. // Works because there's no IOEs being raised in this sequence.
BiConsumer<String, Optional<DurationInfo>> stage = Consumer<String> stage = (s) -> {
(s, od) -> DurationInfo duration = completedStages.get(s);
results.append(String.format("\"%s\"\t\"%s\"\n", results.append(String.format("\"%s\"\t\"%s\"\n",
s, s,
od.map(DurationInfo::getDurationString).orElse(""))); duration == null ? "" : duration));
};
stage.accept("Generate", teragenStageDuration); stage.accept("teragen");
stage.accept("Terasort", terasortStageDuration); stage.accept("terasort");
stage.accept("Validate", teravalidateStageDuration); stage.accept("teravalidate");
stage.accept("Completed", terasortDuration); stage.accept("overall");
String text = results.toString(); String text = results.toString();
File resultsFile = File.createTempFile("results", ".csv"); File resultsFile = File.createTempFile("results", ".csv");
FileUtils.write(resultsFile, text, Charset.forName("UTF-8")); FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
LOG.info("Results are in {}\n{}", resultsFile, text); LOG.info("Results are in {}\n{}", resultsFile, text);
} }
@ -252,4 +360,18 @@ public void test_150_teracleanup() throws Throwable {
public void test_200_directory_deletion() throws Throwable { public void test_200_directory_deletion() throws Throwable {
getFileSystem().delete(terasortPath, true); getFileSystem().delete(terasortPath, true);
} }
/**
* Dump the files under a path -but not fail if the path is not present.,
* @param path path to dump
* @throws Exception any failure.
*/
protected void dumpOutputTree(Path path) throws Exception {
LOG.info("Files under output directory {}", path);
try {
lsR(getFileSystem(), path, true);
} catch (FileNotFoundException e) {
LOG.info("Output directory {} not found", path);
}
}
} }

View File

@ -58,7 +58,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN
# Log S3Guard classes # Log S3Guard classes
#log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG
# if set to debug, this will log the PUT/DELETE operations on a store # if set to debug, this will log the PUT/DELETE operations on a store
#log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG
# Log Committer classes # Log Committer classes
#log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG #log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG