HADOOP-16570. S3A committers encounter scale issues.

Contributed by Steve Loughran.

This addresses two scale issues which has surfaced in large scale benchmarks
of the S3A Committers.

* Thread pools are not cleaned up.
  This now happens, with tests.

* OOM on job commit for jobs with many thousands of tasks,
  each generating tens of (very large) files.

Instead of loading all pending commits into memory as a single list, the list
of files to load is the sole list which is passed around; .pendingset files are
loaded and processed in isolation -and reloaded if necessary for any
abort/rollback operation.

The parallel commit/abort/revert operations now work at the .pendingset level,
rather than that of individual pending commit files. The existing parallelized
Tasks API is still used to commit those files, but with a null thread pool, so
as to serialize the operations.

Change-Id: I5c8240cd31800eaa83d112358770ca0eb2bca797
This commit is contained in:
Steve Loughran 2019-10-04 18:53:53 +01:00
parent aa24add8f0
commit 6574f27fa3
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
22 changed files with 1123 additions and 196 deletions

View File

@ -837,4 +837,10 @@ public final class Constants {
public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3";
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
/**
* How long to wait for the thread pool to terminate when cleaning up.
* Value: {@value} seconds.
*/
public static final int THREAD_POOL_SHUTDOWN_DELAY_SECONDS = 30;
}

View File

@ -154,6 +154,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@ -3062,6 +3063,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
transfers.shutdownNow(true);
transfers = null;
}
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
S3AUtils.closeAll(LOG, metadataStore, instrumentation);
metadataStore = null;
instrumentation = null;
@ -4064,7 +4071,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload(
new AbortMultipartUploadRequest(getBucket(),
destKey,
@ -4084,7 +4091,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
uploadId = upload.getUploadId();
if (LOG.isInfoEnabled()) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LOG.info("Aborting multipart upload {} to {} initiated by {} on {}",
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
uploadId, destKey, upload.getInitiator(),
df.format(upload.getInitiated()));
}

View File

@ -611,11 +611,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
public void close() {
synchronized (metricsSystemLock) {
// it is critical to close each quantile, as they start a scheduled
// task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
LOG.debug("Shutting down metrics publisher");
metricsSystem.publishMetricsNow();
metricsSystem.shutdown();
metricsSystem = null;

View File

@ -18,18 +18,18 @@
package org.apache.hadoop.fs.s3a.commit;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +49,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS;
import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -66,11 +68,28 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
* to handle the creation of a committer when the destination is unknown.
*
* Requiring an output directory simplifies coding and testing.
*
* The original implementation loaded all .pendingset files
* before attempting any commit/abort operations.
* While straightforward and guaranteeing that no changes were made to the
* destination until all files had successfully been loaded -it didn't scale;
* the list grew until it exceeded heap size.
*
* The second iteration builds up an {@link ActiveCommit} class with the
* list of .pendingset files to load and then commit; that can be done
* incrementally and in parallel.
* As a side effect of this change, unless/until changed,
* the commit/abort/revert of all files uploaded by a single task will be
* serialized. This may slow down these operations if there are many files
* created by a few tasks, <i>and</i> the HTTP connection pool in the S3A
* committer was large enough for more all the parallel POST requests.
*/
public abstract class AbstractS3ACommitter extends PathOutputCommitter {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractS3ACommitter.class);
public static final String THREAD_PREFIX = "s3a-committer-pool-";
/**
* Thread pool for task execution.
*/
@ -349,16 +368,11 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* @throws IOException IO failure
*/
protected void maybeCreateSuccessMarkerFromCommits(JobContext context,
List<SinglePendingCommit> pending) throws IOException {
ActiveCommit pending) throws IOException {
List<String> filenames = new ArrayList<>(pending.size());
for (SinglePendingCommit commit : pending) {
String key = commit.getDestinationKey();
if (!key.startsWith("/")) {
// fix up so that FS.makeQualified() sets up the path OK
key = "/" + key;
}
filenames.add(key);
}
// The list of committed objects in pending is size limited in
// ActiveCommit.uploadCommitted.
filenames.addAll(pending.committedObjects);
maybeCreateSuccessMarker(context, filenames);
}
@ -390,22 +404,25 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
* Base job setup deletes the success marker.
* TODO: Do we need this?
* Base job setup (optionally) deletes the success marker and
* always creates the destination directory.
* When objects are committed that dest dir marker will inevitably
* be deleted; creating it now ensures there is something at the end
* while the job is in progress -and if nothing is created, that
* it is still there.
* @param context context
* @throws IOException IO failure
*/
/*
@Override
public void setupJob(JobContext context) throws IOException {
if (createJobMarker) {
try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) {
try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
if (createJobMarker){
commitOperations.deleteSuccessMarker(getOutputPath());
}
getDestFS().mkdirs(getOutputPath());
}
}
*/
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
@ -430,28 +447,152 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
* Commit a list of pending uploads.
* Commit all the pending uploads.
* Each file listed in the ActiveCommit instance is queued for processing
* in a separate thread; its contents are loaded and then (sequentially)
* committed.
* On a failure or abort of a single file's commit, all its uploads are
* aborted.
* The revert operation lists the files already committed and deletes them.
* @param context job context
* @param pending list of pending uploads
* @param pending pending uploads
* @throws IOException on any failure
*/
protected void commitPendingUploads(JobContext context,
List<SinglePendingCommit> pending) throws IOException {
protected void commitPendingUploads(
final JobContext context,
final ActiveCommit pending) throws IOException {
if (pending.isEmpty()) {
LOG.warn("{}: No pending uploads to commit", getRole());
}
LOG.debug("{}: committing the output of {} task(s)",
getRole(), pending.size());
try(CommitOperations.CommitContext commitContext
try (DurationInfo ignored = new DurationInfo(LOG,
"committing the output of %s task(s)", pending.size());
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending)
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.abortWith(path ->
loadAndAbort(commitContext, pending, path, true, false))
.revertWith(path ->
loadAndRevert(commitContext, pending, path))
.run(path ->
loadAndCommit(commitContext, pending, path));
}
}
/**
* Run a precommit check that all files are loadable.
* This check avoids the situation where the inability to read
* a file only surfaces partway through the job commit, so
* results in the destination being tainted.
* @param context job context
* @param pending the pending operations
* @throws IOException any failure
*/
protected void precommitCheckPendingFiles(
final JobContext context,
final ActiveCommit pending) throws IOException {
FileSystem sourceFS = pending.getSourceFS();
try (DurationInfo ignored =
new DurationInfo(LOG, "Preflight Load of pending files")) {
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.run(path -> PendingSet.load(sourceFS, path));
}
}
/**
* Load a pendingset file and commit all of its contents.
* @param commitContext context to commit through
* @param activeCommit commit state
* @param path path to load
* @throws IOException failure
*/
private void loadAndCommit(
final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit,
final Path path) throws IOException {
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Committing %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
Tasks.foreach(pendingSet.getCommits())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(singleCommitThreadPool())
.onFailure((commit, exception) ->
commitContext.abortSingleCommit(commit))
.abortWith(commitContext::abortSingleCommit)
.revertWith(commitContext::revertCommit)
.run(commitContext::commitOrFail);
.run(commit -> {
commitContext.commitOrFail(commit);
activeCommit.uploadCommitted(
commit.getDestinationKey(), commit.getLength());
});
}
}
/**
* Load a pendingset file and revert all of its contents.
* @param commitContext context to commit through
* @param activeCommit commit state
* @param path path to load
* @throws IOException failure
*/
private void loadAndRevert(
final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit,
final Path path) throws IOException {
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Committing %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
Tasks.foreach(pendingSet.getCommits())
.suppressExceptions(true)
.run(commitContext::revertCommit);
}
}
/**
* Load a pendingset file and abort all of its contents.
* @param commitContext context to commit through
* @param activeCommit commit state
* @param path path to load
* @param deleteRemoteFiles should remote files be deleted?
* @throws IOException failure
*/
private void loadAndAbort(
final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit,
final Path path,
final boolean suppressExceptions,
final boolean deleteRemoteFiles) throws IOException {
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Aborting %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
path);
FileSystem fs = getDestFS();
Tasks.foreach(pendingSet.getCommits())
.executeWith(singleCommitThreadPool())
.suppressExceptions(suppressExceptions)
.run(commit -> {
try {
commitContext.abortSingleCommit(commit);
} catch (FileNotFoundException e) {
// Commit ID was not known; file may exist.
// delete it if instructed to do so.
if (deleteRemoteFiles) {
fs.delete(commit.destinationPath(), false);
}
}
});
}
}
@ -465,44 +606,15 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
return getCommitOperations().initiateCommitOperation(getOutputPath());
}
/**
* Try to read every pendingset file and build a list of them/
* In the case of a failure to read the file, exceptions are held until all
* reads have been attempted.
* @param context job context
* @param suppressExceptions whether to suppress exceptions.
* @param fs job attempt fs
* @param pendingCommitFiles list of files found in the listing scan
* @return the list of commits
* @throws IOException on a failure when suppressExceptions is false.
*/
protected List<SinglePendingCommit> loadPendingsetFiles(
JobContext context,
boolean suppressExceptions,
FileSystem fs,
Iterable<? extends FileStatus> pendingCommitFiles) throws IOException {
final List<SinglePendingCommit> pending = Collections.synchronizedList(
Lists.newArrayList());
Tasks.foreach(pendingCommitFiles)
.suppressExceptions(suppressExceptions)
.executeWith(buildThreadPool(context))
.run(pendingCommitFile ->
pending.addAll(
PendingSet.load(fs, pendingCommitFile.getPath()).getCommits())
);
return pending;
}
/**
* Internal Job commit operation: where the S3 requests are made
* (potentially in parallel).
* @param context job context
* @param pending pending request
* @param pending pending commits
* @throws IOException any failure
*/
protected void commitJobInternal(JobContext context,
List<SinglePendingCommit> pending)
ActiveCommit pending)
throws IOException {
commitPendingUploads(context, pending);
@ -523,6 +635,9 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* This must clean up operations; it is called when a commit fails, as
* well as in an {@link #abortJob(JobContext, JobStatus.State)} call.
* The base implementation calls {@link #cleanup(JobContext, boolean)}
* so cleans up the filesystems and destroys the thread pool.
* Subclasses must always invoke this superclass method after their
* own operations.
* @param context job context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException any IO problem raised when suppressExceptions is false.
@ -536,13 +651,15 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Abort all pending uploads to the destination directory during
* job cleanup operations.
* Note: this instantiates the thread pool if required -so
* {@link #destroyThreadPool()} must be called after this.
* @param suppressExceptions should exceptions be suppressed
* @throws IOException IO problem
*/
protected void abortPendingUploadsInCleanup(
boolean suppressExceptions) throws IOException {
Path dest = getOutputPath();
try (DurationInfo d =
try (DurationInfo ignored =
new DurationInfo(LOG, "Aborting all pending commits under %s",
dest);
CommitOperations.CommitContext commitContext
@ -565,13 +682,18 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
* Subclass-specific pre commit actions.
* Subclass-specific pre-Job-commit actions.
* The staging committers all load the pending files to verify that
* they can be loaded.
* The Magic committer does not, because of the overhead of reading files
* from S3 makes it too expensive.
* @param context job context
* @param pending the pending operations
* @throws IOException any failure
*/
protected void preCommitJob(JobContext context,
List<SinglePendingCommit> pending) throws IOException {
@VisibleForTesting
public void preCommitJob(JobContext context,
ActiveCommit pending) throws IOException {
}
/**
@ -584,7 +706,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* <p>
* Commit internal: do the final commit sequence.
* <p>
* The final commit action is to build the {@code __SUCCESS} file entry.
* The final commit action is to build the {@code _SUCCESS} file entry.
* </p>
* @param context job context
* @throws IOException any failure
@ -594,7 +716,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
String id = jobIdString(context);
try (DurationInfo d = new DurationInfo(LOG,
"%s: commitJob(%s)", getRole(), id)) {
List<SinglePendingCommit> pending
ActiveCommit pending
= listPendingUploadsToCommit(context);
preCommitJob(context, pending);
commitJobInternal(context, pending);
@ -629,12 +751,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* @return a list of pending uploads.
* @throws IOException Any IO failure
*/
protected abstract List<SinglePendingCommit> listPendingUploadsToCommit(
protected abstract ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException;
/**
* Cleanup the job context, including aborting anything pending.
* Cleanup the job context, including aborting anything pending
* and destroying the thread pool.
* @param context job context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException any failure if exceptions were not suppressed.
@ -645,6 +768,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
"Cleanup job %s", jobIdString(context))) {
abortPendingUploadsInCleanup(suppressExceptions);
} finally {
destroyThreadPool();
cleanupStagingDirs();
}
}
@ -715,7 +839,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
* threads in the thread-pool is set by s3.multipart.committer.num-threads.
* threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will return null;
*
* @param context the JobContext for this commit
@ -730,10 +854,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
DEFAULT_COMMITTER_THREADS);
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
if (numThreads > 0) {
threadPool = Executors.newFixedThreadPool(numThreads,
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("s3-committer-pool-%d")
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
} else {
return null;
@ -742,6 +866,40 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
return threadPool;
}
/**
* Destroy any thread pools; wait for that to finish,
* but don't overreact if it doesn't finish in time.
*/
protected synchronized void destroyThreadPool() {
if (threadPool != null) {
LOG.debug("Destroying thread pool");
HadoopExecutors.shutdown(threadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
threadPool = null;
}
}
/**
* Get the thread pool for executing the single file commit/revert
* within the commit of all uploads of a single task.
* This is currently null; it is here to allow the Tasks class to
* provide the logic for execute/revert.
* Why not use the existing thread pool? Too much fear of deadlocking,
* and tasks are being committed in parallel anyway.
* @return null. always.
*/
protected final synchronized ExecutorService singleCommitThreadPool() {
return null;
}
/**
* Does this committer have a thread pool?
* @return true if a thread pool exists.
*/
public synchronized boolean hasThreadPool() {
return threadPool != null;
}
/**
* Delete the task attempt path without raising any errors.
* @param context task context
@ -755,6 +913,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Abort all pending uploads in the list.
* This operation is used by the magic committer as part of its
* rollback after a failure during task commit.
* @param context job context
* @param pending pending uploads
* @param suppressExceptions should exceptions be suppressed
@ -779,4 +939,172 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
}
/**
* Abort all pending uploads in the list.
* @param context job context
* @param pending pending uploads
* @param suppressExceptions should exceptions be suppressed?
* @param deleteRemoteFiles should remote files be deleted?
* @throws IOException any exception raised
*/
protected void abortPendingUploads(
final JobContext context,
final ActiveCommit pending,
final boolean suppressExceptions,
final boolean deleteRemoteFiles) throws IOException {
if (pending.isEmpty()) {
LOG.info("{}: no pending commits to abort", getRole());
} else {
try (DurationInfo d = new DurationInfo(LOG,
"Aborting %s uploads", pending.size());
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending.getSourceFiles())
.executeWith(buildThreadPool(context))
.suppressExceptions(suppressExceptions)
.run(path ->
loadAndAbort(commitContext,
pending,
path,
suppressExceptions,
deleteRemoteFiles));
}
}
}
/**
* State of the active commit operation.
*
* It contains a list of all pendingset files to load as the source
* of outstanding commits to complete/abort,
* and tracks the files uploaded.
*
* To avoid running out of heap by loading all the source files
* simultaneously:
* <ol>
* <li>
* The list of files to load is passed round but
* the contents are only loaded on demand.
* </li>
* <li>
* The number of written files tracked for logging in
* the _SUCCESS file are limited to a small amount -enough
* for testing only.
* </li>
* </ol>
*/
public static class ActiveCommit {
private static final AbstractS3ACommitter.ActiveCommit EMPTY
= new ActiveCommit(null, new ArrayList<>());
/** All pendingset files to iterate through. */
private final List<Path> sourceFiles;
/**
* Filesystem for the source files.
*/
private final FileSystem sourceFS;
/**
* List of committed objects; only built up until the commit limit is
* reached.
*/
private final List<String> committedObjects = new ArrayList<>();
/**
* The total number of committed objects.
*/
private int committedObjectCount;
/**
* Total number of bytes committed.
*/
private long committedBytes;
/**
* Construct from a source FS and list of files.
* @param sourceFS filesystem containing the list of pending files
* @param sourceFiles .pendingset files to load and commit.
*/
public ActiveCommit(
final FileSystem sourceFS,
final List<Path> sourceFiles) {
this.sourceFiles = sourceFiles;
this.sourceFS = sourceFS;
}
/**
* Create an active commit of the given pending files.
* @param pendingFS source filesystem.
* @param statuses list of file status or subclass to use.
* @return the commit
*/
public static ActiveCommit fromStatusList(
final FileSystem pendingFS,
final List<? extends FileStatus> statuses) {
return new ActiveCommit(pendingFS,
statuses.stream()
.map(FileStatus::getPath)
.collect(Collectors.toList()));
}
/**
* Get the empty entry.
* @return an active commit with no pending files.
*/
public static ActiveCommit empty() {
return EMPTY;
}
public List<Path> getSourceFiles() {
return sourceFiles;
}
public FileSystem getSourceFS() {
return sourceFS;
}
/**
* Note that a file was committed.
* Increase the counter of files and total size.
* If there is room in the committedFiles list, the file
* will be added to the list and so end up in the _SUCCESS file.
* @param key key of the committed object.
* @param size size in bytes.
*/
public synchronized void uploadCommitted(String key, long size) {
if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) {
committedObjects.add(
key.startsWith("/") ? key : ("/" + key));
}
committedObjectCount++;
committedBytes += size;
}
public synchronized List<String> getCommittedObjects() {
return committedObjects;
}
public synchronized int getCommittedFileCount() {
return committedObjectCount;
}
public synchronized long getCommittedBytes() {
return committedBytes;
}
public int size() {
return sourceFiles.size();
}
public boolean isEmpty() {
return sourceFiles.isEmpty();
}
public void add(Path path) {
sourceFiles.add(path);
}
}
}

View File

@ -51,7 +51,7 @@ public abstract class AbstractS3ACommitterFactory
throw new PathCommitException(outputPath,
"Filesystem not supported by this committer");
}
LOG.info("Using Commmitter {} for {}",
LOG.info("Using Committer {} for {}",
outputCommitter,
outputPath);
return outputCommitter;

View File

@ -255,4 +255,10 @@ public final class CommitConstants {
public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
"fs.s3a.committer.staging.abort.pending.uploads";
/**
* The limit to the number of committed objects tracked during
* job commits and saved to the _SUCCESS file.
*/
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
}

View File

@ -52,6 +52,12 @@ import org.apache.hadoop.util.JsonSerialization;
* Applications reading this data should use/check the {@link #name} field
* to differentiate from any other JSON-based manifest and to identify
* changes in the output format.
*
* Note: to deal with scale issues, the S3A committers do not include any
* more than the number of objects listed in
* {@link org.apache.hadoop.fs.s3a.commit.CommitConstants#SUCCESS_MARKER_FILE_LIMIT}.
* This is intended to suffice for basic integration tests.
* Larger tests should examine the generated files themselves.
*/
@SuppressWarnings("unused")
@InterfaceAudience.Private

View File

@ -109,11 +109,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
* @return a list of pending commits.
* @throws IOException Any IO failure
*/
protected List<SinglePendingCommit> listPendingUploadsToCommit(
protected ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException {
FileSystem fs = getDestFS();
return loadPendingsetFiles(context, false, fs,
return ActiveCommit.fromStatusList(fs,
listAndFilter(fs, getJobAttemptPath(context), false,
CommitOperations.PENDINGSET_FILTER));
}
@ -174,6 +174,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
} finally {
// delete the task attempt so there's no possibility of a second attempt
deleteTaskAttemptPathQuietly(context);
destroyThreadPool();
}
getCommitOperations().taskCompleted(true);
}
@ -181,7 +182,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
/**
* Inner routine for committing a task.
* The list of pending commits is loaded and then saved to the job attempt
* dir.
* dir in a single pendingset file.
* Failure to load any file or save the final file triggers an abort of
* all known pending commits.
* @param context context
@ -250,6 +251,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
deleteQuietly(
attemptPath.getFileSystem(context.getConfiguration()),
attemptPath, true);
destroyThreadPool();
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -66,7 +64,6 @@ public class DirectoryStagingCommitter extends StagingCommitter {
@Override
public void setupJob(JobContext context) throws IOException {
super.setupJob(context);
Path outputPath = getOutputPath();
FileSystem fs = getDestFS();
ConflictResolution conflictResolution = getConflictResolutionMode(
@ -91,10 +88,10 @@ public class DirectoryStagingCommitter extends StagingCommitter {
}
} catch (FileNotFoundException ignored) {
// there is no destination path, hence, no conflict.
// make the parent directory, which also triggers a recursive directory
// creation operation
fs.mkdirs(outputPath);
}
// make the parent directory, which also triggers a recursive directory
// creation operation
super.setupJob(context);
}
/**
@ -106,8 +103,12 @@ public class DirectoryStagingCommitter extends StagingCommitter {
* @throws IOException any failure
*/
@Override
protected void preCommitJob(JobContext context,
List<SinglePendingCommit> pending) throws IOException {
public void preCommitJob(
final JobContext context,
final ActiveCommit pending) throws IOException {
// see if the files can be loaded.
super.preCommitJob(context, pending);
Path outputPath = getOutputPath();
FileSystem fs = getDestFS();
Configuration fsConf = fs.getConf();

View File

@ -20,10 +20,11 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,11 +33,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.Tasks;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.COMMITTER_NAME_PARTITIONED;
/**
* Partitioned committer.
@ -52,6 +56,9 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
* <li>REPLACE: delete the destination partition in the job commit
* (i.e. after and only if all tasks have succeeded.</li>
* </ul>
* To determine the paths, the precommit process actually has to read
* in all source files, independently of the final commit phase.
* This is inefficient, though some parallelization here helps.
*/
public class PartitionedStagingCommitter extends StagingCommitter {
@ -107,6 +114,7 @@ public class PartitionedStagingCommitter extends StagingCommitter {
}
/**
* All
* Job-side conflict resolution.
* The partition path conflict resolution actions are:
* <ol>
@ -119,13 +127,15 @@ public class PartitionedStagingCommitter extends StagingCommitter {
* @throws IOException any failure
*/
@Override
protected void preCommitJob(JobContext context,
List<SinglePendingCommit> pending) throws IOException {
public void preCommitJob(
final JobContext context,
final ActiveCommit pending) throws IOException {
FileSystem fs = getDestFS();
// enforce conflict resolution
Configuration fsConf = fs.getConf();
boolean shouldPrecheckPendingFiles = true;
switch (getConflictResolutionMode(context, fsConf)) {
case FAIL:
// FAIL checking is done on the task side, so this does nothing
@ -134,21 +144,84 @@ public class PartitionedStagingCommitter extends StagingCommitter {
// no check is needed because the output may exist for appending
break;
case REPLACE:
Set<Path> partitions = pending.stream()
.map(SinglePendingCommit::destinationPath)
.map(Path::getParent)
.collect(Collectors.toCollection(Sets::newLinkedHashSet));
for (Path partitionPath : partitions) {
LOG.debug("{}: removing partition path to be replaced: " +
getRole(), partitionPath);
fs.delete(partitionPath, true);
}
// identify and replace the destination partitions
replacePartitions(context, pending);
// and so there is no need to do another check.
shouldPrecheckPendingFiles = false;
break;
default:
throw new PathCommitException("",
getRole() + ": unknown conflict resolution mode: "
+ getConflictResolutionMode(context, fsConf));
}
if (shouldPrecheckPendingFiles) {
precommitCheckPendingFiles(context, pending);
}
}
/**
* Identify all partitions which need to be replaced and then delete them.
* The original implementation relied on all the pending commits to be
* loaded so could simply enumerate them.
* This iteration does not do that; it has to reload all the files
* to build the set, after which it initiates the delete process.
* This is done in parallel.
* <pre>
* Set<Path> partitions = pending.stream()
* .map(Path::getParent)
* .collect(Collectors.toCollection(Sets::newLinkedHashSet));
* for (Path partitionPath : partitions) {
* LOG.debug("{}: removing partition path to be replaced: " +
* getRole(), partitionPath);
* fs.delete(partitionPath, true);
* }
* </pre>
*
* @param context job context
* @param pending the pending operations
* @throws IOException any failure
*/
private void replacePartitions(
final JobContext context,
final ActiveCommit pending) throws IOException {
Map<Path, String> partitions = new ConcurrentHashMap<>();
FileSystem sourceFS = pending.getSourceFS();
ExecutorService pool = buildThreadPool(context);
try (DurationInfo ignored =
new DurationInfo(LOG, "Replacing partitions")) {
// the parent directories are saved to a concurrent hash map.
// for a marginal optimisation, the previous parent is tracked, so
// if a task writes many files to the same dir, the synchronized map
// is updated only once.
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.run(path -> {
PendingSet pendingSet = PendingSet.load(sourceFS, path);
Path lastParent = null;
for (SinglePendingCommit commit : pendingSet.getCommits()) {
Path parent = commit.destinationPath().getParent();
if (parent != null && !parent.equals(lastParent)) {
partitions.put(parent, "");
lastParent = parent;
}
}
});
}
// now do the deletes
FileSystem fs = getDestFS();
Tasks.foreach(partitions.keySet())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.run(partitionPath -> {
LOG.debug("{}: removing partition path to be replaced: " +
getRole(), partitionPath);
fs.delete(partitionPath, true);
});
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
@ -457,6 +456,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
context.getConfiguration().set(
InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
wrappedCommitter.setupJob(context);
super.setupJob(context);
}
/**
@ -466,7 +466,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
* @throws IOException Any IO failure
*/
@Override
protected List<SinglePendingCommit> listPendingUploadsToCommit(
protected ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException {
return listPendingUploads(context, false);
@ -480,7 +480,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
* then this may not match the actual set of pending operations
* @throws IOException shouldn't be raised, but retained for the compiler
*/
protected List<SinglePendingCommit> listPendingUploadsToAbort(
protected ActiveCommit listPendingUploadsToAbort(
JobContext context) throws IOException {
return listPendingUploads(context, true);
}
@ -493,13 +493,14 @@ public class StagingCommitter extends AbstractS3ACommitter {
* then this may not match the actual set of pending operations
* @throws IOException Any IO failure which wasn't swallowed.
*/
protected List<SinglePendingCommit> listPendingUploads(
protected ActiveCommit listPendingUploads(
JobContext context, boolean suppressExceptions) throws IOException {
try {
Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context);
try (DurationInfo ignored = new DurationInfo(LOG,
"Listing pending uploads")) {
Path wrappedJobAttemptPath = getJobAttemptPath(context);
final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
context.getConfiguration());
return loadPendingsetFiles(context, suppressExceptions, attemptFS,
return ActiveCommit.fromStatusList(attemptFS,
listAndFilter(attemptFS,
wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER));
@ -512,7 +513,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
maybeIgnore(suppressExceptions, "Listing pending uploads", e);
}
// reached iff an IOE was caught and swallowed
return new ArrayList<>(0);
return ActiveCommit.empty();
}
@Override
@ -558,8 +559,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
boolean failed = false;
try (DurationInfo d = new DurationInfo(LOG,
"%s: aborting job in state %s ", r, jobIdString(context))) {
List<SinglePendingCommit> pending = listPendingUploadsToAbort(context);
abortPendingUploads(context, pending, suppressExceptions);
ActiveCommit pending = listPendingUploadsToAbort(context);
abortPendingUploads(context, pending, suppressExceptions, true);
} catch (FileNotFoundException e) {
// nothing to list
LOG.debug("No job directory to read uploads from");
@ -571,6 +572,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
}
}
/**
* Delete the working paths of a job.
* <ol>
@ -646,6 +648,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
getRole(), context.getTaskAttemptID(), e);
getCommitOperations().taskCompleted(false);
throw e;
} finally {
destroyThreadPool();
}
getCommitOperations().taskCompleted(true);
}
@ -694,6 +698,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
try {
Tasks.foreach(taskOutput)
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.run(stat -> {
Path path = stat.getPath();
@ -779,6 +784,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
LOG.error("{}: exception when aborting task {}",
getRole(), context.getTaskAttemptID(), e);
throw e;
} finally {
destroyThreadPool();
}
}
@ -901,4 +908,20 @@ public class StagingCommitter extends AbstractS3ACommitter {
defVal).toUpperCase(Locale.ENGLISH);
}
/**
* Pre-commit actions for a job.
* Loads all the pending files to verify they can be loaded
* and parsed.
* @param context job context
* @param pending pending commits
* @throws IOException any failure
*/
@Override
public void preCommitJob(
final JobContext context,
final ActiveCommit pending) throws IOException {
// see if the files can be loaded.
precommitCheckPendingFiles(context, pending);
}
}

View File

@ -19,17 +19,21 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.util.Set;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getCurrentThreadNames;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.listInitialThreadsForLifecycleChecks;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED;
/**
* Tests of the S3A FileSystem which is closed; just make sure
* that that basic file Ops fail meaningfully.
* Tests of the S3A FileSystem which is closed.
*/
public class ITestS3AClosedFS extends AbstractS3ATestBase {
@ -47,6 +51,16 @@ public class ITestS3AClosedFS extends AbstractS3ATestBase {
// no op, as the FS is closed
}
private static final Set<String> THREAD_SET =
listInitialThreadsForLifecycleChecks();
@AfterClass
public static void checkForThreadLeakage() {
Assertions.assertThat(getCurrentThreadNames())
.describedAs("The threads at the end of the test run")
.isSubsetOf(THREAD_SET);
}
@Test
public void testClosedGetFileStatus() throws Exception {
intercept(IOException.class, E_FS_CLOSED,

View File

@ -63,7 +63,10 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
@ -598,7 +601,7 @@ public final class S3ATestUtils {
String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test");
if (testUniqueForkId != null) {
// patch temp dir for the specific branch
tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
tmpDir = tmpDir + File.separator + testUniqueForkId;
conf.set(HADOOP_TMP_DIR, tmpDir);
}
conf.set(BUFFER_DIR, tmpDir);
@ -1346,4 +1349,41 @@ public final class S3ATestUtils {
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
() -> fs.getFileStatus(testFilePath));
}
/**
* This creates a set containing all current threads and some well-known
* thread names whose existence should not fail test runs.
* They are generally static cleaner threads created by various classes
* on instantiation.
* @return a set of threads to use in later assertions.
*/
public static Set<String> listInitialThreadsForLifecycleChecks() {
Set<String> threadSet = getCurrentThreadNames();
// static filesystem statistics cleaner
threadSet.add(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner");
// AWS progress callbacks
threadSet.add("java-sdk-progress-listener-callback-thread");
// another AWS thread
threadSet.add("java-sdk-http-connection-reaper");
// java.lang.UNIXProcess. maybe if chmod is called?
threadSet.add("process reaper");
// once a quantile has been scheduled, the mutable quantile thread pool
// is initialized; it has a minimum thread size of 1.
threadSet.add("MutableQuantiles-0");
// IDE?
threadSet.add("Attach Listener");
return threadSet;
}
/**
* Get a set containing the names of all active threads.
* @return the current set of threads.
*/
public static Set<String> getCurrentThreadNames() {
return Thread.getAllStackTraces().keySet()
.stream()
.map(Thread::getName)
.collect(Collectors.toCollection(TreeSet::new));
}
}

View File

@ -25,7 +25,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -178,6 +181,20 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
super.teardown();
}
/**
* This only looks for leakage of committer thread pools,
* and not any other leaked threads, such as those from S3A FS instances.
*/
@AfterClass
public static void checkForThreadLeakage() {
List<String> committerThreads = getCurrentThreadNames().stream()
.filter(n -> n.startsWith(AbstractS3ACommitter.THREAD_PREFIX))
.collect(Collectors.toList());
Assertions.assertThat(committerThreads)
.describedAs("Outstanding committer threads")
.isEmpty();
}
/**
* Add the specified job to the current list of jobs to abort in teardown.
* @param jobData job data.
@ -518,6 +535,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("\ncommitting job");
committer.commitJob(jContext);
describe("commit complete\n");
verifyCommitterHasNoThreads(committer);
}
}
@ -574,7 +592,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// Commit the task. This will promote data and metadata to where
// job commits will pick it up on commit or abort.
committer.commitTask(tContext);
commitTask(committer, tContext);
assertTaskAttemptPathDoesNotExist(committer, tContext);
Configuration conf2 = jobData.job.getConfiguration();
@ -600,6 +618,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
committer2.abortJob(jContext2, JobStatus.State.KILLED);
// now, state of system may still have pending data
assertNoMultipartUploadsPending(outDir);
verifyCommitterHasNoThreads(committer2);
}
protected void assertTaskAttemptPathDoesNotExist(
@ -742,7 +761,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("2. Committing task");
assertTrue("No files to commit were found by " + committer,
committer.needsTaskCommit(tContext));
committer.commitTask(tContext);
commitTask(committer, tContext);
// this is only task commit; there MUST be no part- files in the dest dir
waitForConsistency();
@ -758,7 +777,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("3. Committing job");
assertMultipartUploadsPending(outDir);
committer.commitJob(jContext);
commitJob(committer, jContext);
// validate output
describe("4. Validating content");
@ -809,7 +828,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);
committer.commitJob(jContext);
commitJob(committer, jContext);
// but the data got there, due to the order of operations.
validateContent(outDir, shouldExpectSuccessMarker());
@ -1011,6 +1030,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
assertJobAbortCleanedUp(jobData);
verifyCommitterHasNoThreads(committer);
}
/**
@ -1064,6 +1084,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// try again; expect abort to be idempotent.
committer.abortJob(jContext, JobStatus.State.FAILED);
assertNoMultipartUploadsPending(outDir);
verifyCommitterHasNoThreads(committer);
}
public void assertPart0000DoesNotExist(Path dir) throws Exception {
@ -1223,8 +1244,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
validateTaskAttemptPathAfterWrite(dest);
assertTrue("Committer does not have data to commit " + committer,
committer.needsTaskCommit(tContext));
committer.commitTask(tContext);
committer.commitJob(jContext);
commitTask(committer, tContext);
commitJob(committer, jContext);
// validate output
verifySuccessMarker(outDir);
}
@ -1257,6 +1278,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
AbstractS3ACommitter committer2 = (AbstractS3ACommitter)
outputFormat.getOutputCommitter(newAttempt);
committer2.abortTask(tContext);
verifyCommitterHasNoThreads(committer2);
assertNoMultipartUploadsPending(getOutDir());
}
@ -1306,19 +1328,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// at this point, job1 and job2 both have uncommitted tasks
// commit tasks in order task 2, task 1.
committer2.commitTask(tContext2);
committer1.commitTask(tContext1);
commitTask(committer2, tContext2);
commitTask(committer1, tContext1);
assertMultipartUploadsPending(job1Dest);
assertMultipartUploadsPending(job2Dest);
// commit jobs in order job 1, job 2
committer1.commitJob(jContext1);
commitJob(committer1, jContext1);
assertNoMultipartUploadsPending(job1Dest);
getPart0000(job1Dest);
assertMultipartUploadsPending(job2Dest);
committer2.commitJob(jContext2);
commitJob(committer2, jContext2);
getPart0000(job2Dest);
assertNoMultipartUploadsPending(job2Dest);
} finally {
@ -1379,4 +1401,36 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
TaskAttemptContext context) throws IOException {
}
/**
* Commit a task then validate the state of the committer afterwards.
* @param committer committer
* @param tContext task context
* @throws IOException IO failure
*/
protected void commitTask(final AbstractS3ACommitter committer,
final TaskAttemptContext tContext) throws IOException {
committer.commitTask(tContext);
verifyCommitterHasNoThreads(committer);
}
/**
* Commit a job then validate the state of the committer afterwards.
* @param committer committer
* @param jContext job context
* @throws IOException IO failure
*/
protected void commitJob(final AbstractS3ACommitter committer,
final JobContext jContext) throws IOException {
committer.commitJob(jContext);
verifyCommitterHasNoThreads(committer);
}
/**
* Verify that the committer does not have a thread pool.
* @param committer committer to validate.
*/
protected void verifyCommitterHasNoThreads(AbstractS3ACommitter committer) {
assertFalse("Committer has an active thread pool",
committer.hasThreadPool());
}
}

View File

@ -73,7 +73,7 @@ public class TestTasks extends HadoopTestBase {
* more checks on single thread than parallel ops.
* @return a list of parameter tuples.
*/
@Parameterized.Parameters
@Parameterized.Parameters(name = "threads={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{0},

View File

@ -632,10 +632,10 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
final FileStatus st = fs.getFileStatus(magicDir);
StringBuilder result = new StringBuilder("Found magic dir which should"
+ " have been deleted at ").append(st).append('\n');
result.append("[");
result.append(" [");
applyLocatedFiles(fs.listFiles(magicDir, true),
(status) -> result.append(status.getPath()).append('\n'));
result.append("[");
(status) -> result.append(" ").append(status.getPath()).append('\n'));
result.append("]");
return result.toString();
});
}

View File

@ -251,6 +251,12 @@ public class StagingTestBase {
verify(mockS3).getFileStatus(path);
}
/**
* Verify that mkdirs was invoked once.
* @param mockS3 mock
* @param path path to check
* @throws IOException from the mkdirs signature.
*/
public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
throws IOException {
verify(mockS3).mkdirs(path);
@ -320,12 +326,7 @@ public class StagingTestBase {
@Before
public void setupJob() throws Exception {
this.jobConf = new JobConf();
jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
UUID.randomUUID().toString());
jobConf.setBoolean(
CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
false);
this.jobConf = createJobConf();
this.job = new JobContextImpl(jobConf, JOB_ID);
this.results = new StagingTestBase.ClientResults();
@ -338,6 +339,16 @@ public class StagingTestBase {
wrapperFS.setAmazonS3Client(mockClient);
}
protected JobConf createJobConf() {
JobConf conf = new JobConf();
conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
UUID.randomUUID().toString());
conf.setBoolean(
CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
false);
return conf;
}
public S3AFileSystem getMockS3A() {
return mockFS;
}
@ -461,6 +472,11 @@ public class StagingTestBase {
return deletes;
}
public List<String> getDeletePaths() {
return deletes.stream().map(DeleteObjectRequest::getKey).collect(
Collectors.toList());
}
public void resetDeletes() {
deletes.clear();
}
@ -478,6 +494,14 @@ public class StagingTestBase {
requests.clear();
}
public void addUpload(String id, String key) {
activeUploads.put(id, key);
}
public void addUploads(Map<String, String> uploadMap) {
activeUploads.putAll(uploadMap);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@ -648,8 +672,9 @@ public class StagingTestBase {
}
CompleteMultipartUploadRequest req = getArgumentAt(invocation,
0, CompleteMultipartUploadRequest.class);
String uploadId = req.getUploadId();
removeUpload(results, uploadId);
results.commits.add(req);
results.activeUploads.remove(req.getUploadId());
return newResult(req);
}
@ -669,14 +694,7 @@ public class StagingTestBase {
AbortMultipartUploadRequest req = getArgumentAt(invocation,
0, AbortMultipartUploadRequest.class);
String id = req.getUploadId();
String p = results.activeUploads.remove(id);
if (p == null) {
// upload doesn't exist
AmazonS3Exception ex = new AmazonS3Exception(
"not found " + id);
ex.setStatusCode(404);
throw ex;
}
removeUpload(results, id);
results.aborts.add(req);
return null;
}
@ -729,6 +747,24 @@ public class StagingTestBase {
return mockClient;
}
/**
* Remove an upload from the upload map.
* @param results result set
* @param uploadId The upload ID to remove
* @throws AmazonS3Exception with error code 404 if the id is unknown.
*/
protected static void removeUpload(final ClientResults results,
final String uploadId) {
String removed = results.activeUploads.remove(uploadId);
if (removed == null) {
// upload doesn't exist
AmazonS3Exception ex = new AmazonS3Exception(
"not found " + uploadId);
ex.setStatusCode(404);
throw ex;
}
}
private static CompleteMultipartUploadResult newResult(
CompleteMultipartUploadRequest req) {
return new CompleteMultipartUploadResult();

View File

@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.amazonaws.services.s3.model.PartETag;
import com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.PENDINGSET_SUFFIX;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.BUCKET;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPath;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPathUri;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.pathIsDirectory;
/**
* Scale test of the directory committer: if there are many, many files
* does job commit overload.
* This is a mock test as to avoid the overhead of going near S3;
* it does use a lot of local filesystem files though so as to
* simulate real large scale deployment better.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDirectoryCommitterScale
extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
private static final Logger LOG =
LoggerFactory.getLogger(TestDirectoryCommitterScale.class);
public static final int TASKS = 500;
public static final int FILES_PER_TASK = 10;
public static final int TOTAL_COMMIT_COUNT = FILES_PER_TASK * TASKS;
public static final int BLOCKS_PER_TASK = 1000;
private static File stagingDir;
private static LocalFileSystem localFS;
private static Path stagingPath;
private static Map<String, String> activeUploads =
Maps.newHashMap();
@Override
DirectoryCommitterForTesting newJobCommitter() throws Exception {
return new DirectoryCommitterForTesting(outputPath,
createTaskAttemptForJob());
}
@BeforeClass
public static void setupStaging() throws Exception {
stagingDir = File.createTempFile("staging", "");
stagingDir.delete();
stagingDir.mkdir();
stagingPath = new Path(stagingDir.toURI());
localFS = FileSystem.getLocal(new Configuration());
}
@AfterClass
public static void teardownStaging() throws IOException {
try {
if (stagingDir != null) {
FileUtils.deleteDirectory(stagingDir);
}
} catch (IOException ignored) {
}
}
@Override
protected JobConf createJobConf() {
JobConf conf = super.createJobConf();
conf.setInt(
CommitConstants.FS_S3A_COMMITTER_THREADS,
100);
return conf;
}
protected Configuration getJobConf() {
return getJob().getConfiguration();
}
@Test
public void test_010_createTaskFiles() throws Exception {
try (DurationInfo ignored =
new DurationInfo(LOG, "Creating %d test files in %s",
TOTAL_COMMIT_COUNT, stagingDir)) {
createTasks();
}
}
/**
* Create the mock uploads of the tasks and save
* to .pendingset files.
* @throws IOException failure.
*/
private void createTasks() throws IOException {
// create a stub multipart commit containing multiple files.
// step1: a list of tags.
// this is the md5sum of hadoop 3.2.1.tar
String tag = "9062dcf18ffaee254821303bbd11c72b";
List<PartETag> etags = IntStream.rangeClosed(1, BLOCKS_PER_TASK + 1)
.mapToObj(i -> new PartETag(i, tag))
.collect(Collectors.toList());
SinglePendingCommit base = new SinglePendingCommit();
base.setBucket(BUCKET);
base.setJobId("0000");
base.setLength(914688000);
base.bindCommitData(etags);
// these get overwritten
base.setDestinationKey("/base");
base.setUploadId("uploadId");
base.setUri(outputPathUri.toString());
SinglePendingCommit[] singles = new SinglePendingCommit[FILES_PER_TASK];
byte[] bytes = base.toBytes();
for (int i = 0; i < FILES_PER_TASK; i++) {
singles[i] = SinglePendingCommit.serializer().fromBytes(bytes);
}
// now create the files, using this as the template
int uploadCount = 0;
for (int task = 0; task < TASKS; task++) {
PendingSet pending = new PendingSet();
String taskId = String.format("task-%04d", task);
for (int i = 0; i < FILES_PER_TASK; i++) {
String uploadId = String.format("%05d-task-%04d-file-%02d",
uploadCount, task, i);
// longer paths to take up more space.
Path p = new Path(outputPath,
"datasets/examples/testdirectoryscale/"
+ "year=2019/month=09/day=26/hour=20/second=53"
+ uploadId);
URI dest = p.toUri();
SinglePendingCommit commit = singles[i];
String key = dest.getPath();
commit.setDestinationKey(key);
commit.setUri(dest.toString());
commit.touch(Instant.now().toEpochMilli());
commit.setTaskId(taskId);
commit.setUploadId(uploadId);
pending.add(commit);
activeUploads.put(uploadId, key);
}
Path path = new Path(stagingPath,
String.format("task-%04d." + PENDINGSET_SUFFIX, task));
pending.save(localFS, path, true);
}
}
@Test
public void test_020_loadFilesToAttempt() throws Exception {
DirectoryStagingCommitter committer = newJobCommitter();
Configuration jobConf = getJobConf();
jobConf.set(
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
FileSystem mockS3 = getMockS3A();
pathIsDirectory(mockS3, outputPath);
try (DurationInfo ignored =
new DurationInfo(LOG, "listing pending uploads")) {
AbstractS3ACommitter.ActiveCommit activeCommit
= committer.listPendingUploadsToCommit(getJob());
Assertions.assertThat(activeCommit.getSourceFiles())
.describedAs("Source files of %s", activeCommit)
.hasSize(TASKS);
}
}
@Test
public void test_030_commitFiles() throws Exception {
DirectoryCommitterForTesting committer = newJobCommitter();
StagingTestBase.ClientResults results = getMockResults();
results.addUploads(activeUploads);
Configuration jobConf = getJobConf();
jobConf.set(
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
S3AFileSystem mockS3 = getMockS3A();
pathIsDirectory(mockS3, outputPath);
try (DurationInfo ignored =
new DurationInfo(LOG, "Committing Job")) {
committer.commitJob(getJob());
}
Assertions.assertThat(results.getCommits())
.describedAs("commit count")
.hasSize(TOTAL_COMMIT_COUNT);
AbstractS3ACommitter.ActiveCommit activeCommit = committer.activeCommit;
Assertions.assertThat(activeCommit.getCommittedObjects())
.describedAs("committed objects in active commit")
.hasSize(Math.min(TOTAL_COMMIT_COUNT,
CommitConstants.SUCCESS_MARKER_FILE_LIMIT));
Assertions.assertThat(activeCommit.getCommittedFileCount())
.describedAs("committed objects in active commit")
.isEqualTo(TOTAL_COMMIT_COUNT);
}
@Test
public void test_040_abortFiles() throws Exception {
DirectoryStagingCommitter committer = newJobCommitter();
getMockResults().addUploads(activeUploads);
Configuration jobConf = getJobConf();
jobConf.set(
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
FileSystem mockS3 = getMockS3A();
pathIsDirectory(mockS3, outputPath);
committer.abortJob(getJob(), JobStatus.State.FAILED);
}
/**
* Committer overridden for better testing.
*/
private static final class DirectoryCommitterForTesting extends
DirectoryStagingCommitter {
private ActiveCommit activeCommit;
private DirectoryCommitterForTesting(Path outputPath,
TaskAttemptContext context) throws IOException {
super(outputPath, context);
}
@Override
protected void initOutput(Path out) throws IOException {
super.initOutput(out);
setOutputPath(out);
}
/**
* Returns the mock FS without checking FS type.
* @param out output path
* @param config job/task config
* @return a filesystem.
* @throws IOException failure to get the FS
*/
@Override
protected FileSystem getDestinationFS(Path out, Configuration config)
throws IOException {
return out.getFileSystem(config);
}
@Override
public Path getJobAttemptPath(JobContext context) {
return stagingPath;
}
@Override
protected void commitJobInternal(final JobContext context,
final ActiveCommit pending)
throws IOException {
activeCommit = pending;
super.commitJobInternal(context, pending);
}
}
}

View File

@ -35,6 +35,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Before;
@ -535,33 +536,31 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
return jobCommitter.toString();
});
assertEquals("Should have succeeded to commit some uploads",
5, results.getCommits().size());
assertEquals("Should have deleted the files that succeeded",
5, results.getDeletes().size());
Set<String> commits = results.getCommits()
.stream()
.map((commit) -> commit.getBucketName() + commit.getKey())
.map(commit ->
"s3a://" + commit.getBucketName() + "/" + commit.getKey())
.collect(Collectors.toSet());
Set<String> deletes = results.getDeletes()
.stream()
.map((delete) -> delete.getBucketName() + delete.getKey())
.map(delete ->
"s3a://" + delete.getBucketName() + "/" + delete.getKey())
.collect(Collectors.toSet());
assertEquals("Committed and deleted objects should match",
commits, deletes);
assertEquals("Mismatch in aborted upload count",
7, results.getAborts().size());
Assertions.assertThat(commits)
.describedAs("Committed objects compared to deleted paths %s", results)
.containsExactlyInAnyOrderElementsOf(deletes);
Assertions.assertThat(results.getAborts())
.describedAs("aborted count in %s", results)
.hasSize(7);
Set<String> uploadIds = getCommittedIds(results.getCommits());
uploadIds.addAll(getAbortedIds(results.getAborts()));
assertEquals("Should have committed/deleted or aborted all uploads",
uploads, uploadIds);
Assertions.assertThat(uploadIds)
.describedAs("Combined commit/delete and aborted upload IDs")
.containsExactlyInAnyOrderElementsOf(uploads);
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
}

View File

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -84,17 +85,18 @@ public class TestStagingDirectoryOutputCommitter
() -> committer.setupJob(getJob()));
// but there are no checks in job commit (HADOOP-15469)
committer.commitJob(getJob());
// this is done by calling the preCommit method directly,
committer.preCommitJob(getJob(), AbstractS3ACommitter.ActiveCommit.empty());
reset((FileSystem) getMockS3A());
reset(getMockS3A());
pathDoesNotExist(getMockS3A(), outputPath);
committer.setupJob(getJob());
verifyExistenceChecked(getMockS3A(), outputPath);
verifyMkdirsInvoked(getMockS3A(), outputPath);
verifyNoMoreInteractions((FileSystem) getMockS3A());
verifyNoMoreInteractions(getMockS3A());
reset((FileSystem) getMockS3A());
reset(getMockS3A());
pathDoesNotExist(getMockS3A(), outputPath);
committer.commitJob(getJob());
verifyCompletion(getMockS3A());

View File

@ -18,20 +18,21 @@
package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -59,37 +60,59 @@ public class TestStagingPartitionedJobCommit
/**
* Subclass of the Partitioned Staging committer used in the test cases.
*/
private static final class PartitionedStagingCommitterForTesting
private final class PartitionedStagingCommitterForTesting
extends PartitionedCommitterForTesting {
private boolean aborted = false;
private boolean aborted;
private PartitionedStagingCommitterForTesting(TaskAttemptContext context)
throws IOException {
super(StagingTestBase.outputPath, context);
}
/**
* Generate pending uploads to commit.
* This is quite complex as the mock pending uploads need to be saved
* to a filesystem for the next stage of the commit process.
* To simulate multiple commit, more than one .pendingset file is created,
* @param context job context
* @return an active commit containing a list of paths to valid pending set
* file.
* @throws IOException IO failure
*/
@Override
protected List<SinglePendingCommit> listPendingUploadsToCommit(
protected ActiveCommit listPendingUploadsToCommit(
JobContext context) throws IOException {
List<SinglePendingCommit> pending = Lists.newArrayList();
LocalFileSystem localFS = FileSystem.getLocal(getConf());
ActiveCommit activeCommit = new ActiveCommit(localFS,
new ArrayList<>(0));
// need to create some pending entries.
for (String dateint : Arrays.asList("20161115", "20161116")) {
PendingSet pendingSet = new PendingSet();
for (String hour : Arrays.asList("13", "14")) {
String uploadId = UUID.randomUUID().toString();
String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour +
"/" + UUID.randomUUID().toString() + ".parquet";
"/" + uploadId + ".parquet";
SinglePendingCommit commit = new SinglePendingCommit();
commit.setBucket(BUCKET);
commit.setDestinationKey(key);
commit.setUri("s3a://" + BUCKET + "/" + key);
commit.setUploadId(UUID.randomUUID().toString());
commit.setUploadId(uploadId);
ArrayList<String> etags = new ArrayList<>();
etags.add("tag1");
commit.setEtags(etags);
pending.add(commit);
pendingSet.add(commit);
// register the upload so commit operations are not rejected
getMockResults().addUpload(uploadId, key);
}
File file = File.createTempFile("staging", ".pendingset");
file.deleteOnExit();
Path path = new Path(file.toURI());
pendingSet.save(localFS, path, true);
activeCommit.add(path);
}
return pending;
return activeCommit;
}
@Override

View File

@ -27,6 +27,7 @@ import java.util.UUID;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
@ -114,18 +115,7 @@ public class TestStagingPartitionedTaskCommit
reset(mockS3);
committer.commitTask(getTAC());
Set<String> files = Sets.newHashSet();
for (InitiateMultipartUploadRequest request :
getMockResults().getRequests().values()) {
assertEquals(BUCKET, request.getBucketName());
files.add(request.getKey());
}
assertEquals("Should have the right number of uploads",
relativeFiles.size(), files.size());
Set<String> expected = buildExpectedList(committer);
assertEquals("Should have correct paths", expected, files);
verifyFilesCreated(committer);
}
@Test
@ -146,18 +136,29 @@ public class TestStagingPartitionedTaskCommit
pathExists(mockS3, new Path(outputPath, relativeFiles.get(2)).getParent());
committer.commitTask(getTAC());
verifyFilesCreated(committer);
}
/**
* Verify that the files created matches that expected.
* @param committer committer
*/
protected void verifyFilesCreated(
final PartitionedStagingCommitter committer) {
Set<String> files = Sets.newHashSet();
for (InitiateMultipartUploadRequest request :
getMockResults().getRequests().values()) {
assertEquals(BUCKET, request.getBucketName());
files.add(request.getKey());
}
assertEquals("Should have the right number of uploads",
relativeFiles.size(), files.size());
Assertions.assertThat(files)
.describedAs("Should have the right number of uploads")
.hasSize(relativeFiles.size());
Set<String> expected = buildExpectedList(committer);
assertEquals("Should have correct paths", expected, files);
Assertions.assertThat(files)
.describedAs("Should have correct paths")
.containsExactlyInAnyOrderElementsOf(expected);
}
@Test
@ -180,18 +181,7 @@ public class TestStagingPartitionedTaskCommit
pathExists(mockS3, new Path(outputPath, relativeFiles.get(3)).getParent());
committer.commitTask(getTAC());
Set<String> files = Sets.newHashSet();
for (InitiateMultipartUploadRequest request :
getMockResults().getRequests().values()) {
assertEquals(BUCKET, request.getBucketName());
files.add(request.getKey());
}
assertEquals("Should have the right number of uploads",
relativeFiles.size(), files.size());
Set<String> expected = buildExpectedList(committer);
assertEquals("Should have correct paths", expected, files);
verifyFilesCreated(committer);
}
public Set<String> buildExpectedList(StagingCommitter committer) {