From 1eeb9d9d6784358a84020bd1e82da37ce3410364 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 18 Nov 2020 13:34:36 +0000 Subject: [PATCH] HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. (#2399) See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs Contributed by Steve Loughran. Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a --- .../src/main/resources/core-default.xml | 13 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 9 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 9 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 376 ++++++++++++++++-- .../hadoop/fs/s3a/commit/CommitConstants.java | 68 +++- .../fs/s3a/commit/CommitOperations.java | 17 +- .../fs/s3a/commit/CommitUtilsWithMR.java | 45 ++- .../commit/InternalCommitterConstants.java | 31 +- .../fs/s3a/commit/files/PendingSet.java | 29 +- .../commit/files/PersistentCommitData.java | 2 +- .../s3a/commit/files/SinglePendingCommit.java | 2 +- .../fs/s3a/commit/files/SuccessData.java | 27 +- .../commit/magic/MagicS3GuardCommitter.java | 26 +- .../staging/PartitionedStagingCommitter.java | 5 +- .../hadoop/fs/s3a/commit/staging/Paths.java | 16 +- .../s3a/commit/staging/StagingCommitter.java | 94 +---- .../staging/StagingCommitterConstants.java | 15 - .../markdown/tools/hadoop-aws/committers.md | 322 +++++++++++++-- .../tools/hadoop-aws/troubleshooting_s3a.md | 61 ++- .../fs/s3a/commit/AbstractCommitITest.java | 17 +- .../s3a/commit/AbstractITCommitProtocol.java | 301 +++++++++++++- .../s3a/commit/LoggingTextOutputFormat.java | 2 +- .../integration/ITestS3ACommitterMRJob.java | 7 +- .../magic/ITestMagicCommitProtocol.java | 43 +- .../s3a/commit/staging/StagingTestBase.java | 4 +- .../commit/staging/TestStagingCommitter.java | 151 ++++++- .../TestStagingPartitionedJobCommit.java | 2 +- .../ITestStagingCommitProtocol.java | 14 +- .../commit/terasort/ITestTerasortOnS3A.java | 2 +- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 2 +- 30 files changed, 1401 insertions(+), 311 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index cf156af1964..2c3f14a8505 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1925,20 +1925,13 @@ - fs.s3a.committer.staging.abort.pending.uploads + fs.s3a.committer.abort.pending.uploads true - Should the staging committers abort all pending uploads to the destination + Should the committers abort all pending uploads to the destination directory? - Changing this if more than one partitioned committer is - writing to the same destination tree simultaneously; otherwise - the first job to complete will cancel all outstanding uploads from the - others. However, it may lead to leaked outstanding uploads from failed - tasks. If disabled, configure the bucket lifecycle to remove uploads - after a time period, and/or set up a workflow to explicitly delete - entries. Otherwise there is a risk that uncommitted uploads may run up - bills. + Set to false if more than one job is writing to the same directory tree. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f8e6ab2c2b6..cc37df766fd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -180,6 +180,8 @@ import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; @@ -314,9 +316,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { - // this is retained as a placeholder for when new deprecated keys - // need to be added. Configuration.DeprecationDelta[] deltas = { + new Configuration.DeprecationDelta( + FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, + FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS) }; if (deltas.length > 0) { @@ -4593,7 +4596,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ @Retries.OnceRaw void abortMultipartUpload(String destKey, String uploadId) { - LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); + LOG.info("Aborting multipart upload {} to {}", uploadId, destKey); getAmazonS3Client().abortMultipartUpload( new AbortMultipartUploadRequest(getBucket(), destKey, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 26d0942db61..1a0218acc93 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -131,6 +131,8 @@ public class WriteOperationHelper implements WriteOperations { */ void operationRetried(String text, Exception ex, int retries, boolean idempotent) { + LOG.info("{}: Retried {}: {}", text, retries, ex.toString()); + LOG.debug("Stack", ex); owner.operationRetried(text, ex, retries, idempotent); } @@ -323,7 +325,9 @@ public class WriteOperationHelper implements WriteOperations { public void abortMultipartUpload(String destKey, String uploadId, Retried retrying) throws IOException { - invoker.retry("Aborting multipart upload", destKey, true, + invoker.retry("Aborting multipart upload ID " + uploadId, + destKey, + true, retrying, () -> owner.abortMultipartUpload( destKey, @@ -585,7 +589,8 @@ public class WriteOperationHelper implements WriteOperations { @Retries.RetryTranslated public UploadPartResult uploadPart(UploadPartRequest request) throws IOException { - return retry("upload part", + return retry("upload part #" + request.getPartNumber() + + " upload ID "+ request.getUploadId(), request.getKey(), true, () -> owner.uploadPart(request)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 32d00a4353e..8310b9973da 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -20,13 +20,14 @@ package org.apache.hadoop.fs.s3a.commit; import java.io.FileNotFoundException; import java.io.IOException; +import java.text.DateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import com.amazonaws.services.s3.model.MultipartUpload; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +36,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,8 +48,10 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DurationInfo; @@ -58,6 +63,10 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; /** * Abstract base class for S3A committers; allows for any commonality @@ -86,11 +95,40 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; * committer was large enough for more all the parallel POST requests. */ public abstract class AbstractS3ACommitter extends PathOutputCommitter { + private static final Logger LOG = LoggerFactory.getLogger(AbstractS3ACommitter.class); public static final String THREAD_PREFIX = "s3a-committer-pool-"; + /** + * Error string when task setup fails. + */ + @VisibleForTesting + public static final String E_SELF_GENERATED_JOB_UUID + = "has a self-generated job UUID"; + + /** + * Unique ID for a Job. + * In MapReduce Jobs the YARN JobID suffices. + * On Spark this only be the YARN JobID + * it is known to be creating strongly unique IDs + * (i.e. SPARK-33402 is on the branch). + */ + private final String uuid; + + /** + * Source of the {@link #uuid} value. + */ + private final JobUUIDSource uuidSource; + + /** + * Has this instance been used for job setup? + * If so then it is safe for a locally generated + * UUID to be used for task setup. + */ + private boolean jobSetup; + /** * Thread pool for task execution. */ @@ -147,14 +185,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { this.jobContext = context; this.role = "Task committer " + context.getTaskAttemptID(); setConf(context.getConfiguration()); + Pair id = buildJobUUID( + conf, context.getJobID()); + this.uuid = id.getLeft(); + this.uuidSource = id.getRight(); + LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText()); initOutput(outputPath); LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); - createJobMarker = context.getConfiguration().getBoolean( + this.createJobMarker = context.getConfiguration().getBoolean( CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER); - commitOperations = new CommitOperations(fs); + this.commitOperations = new CommitOperations(fs); } /** @@ -202,7 +245,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * @return the working path. */ @Override - public Path getWorkPath() { + public final Path getWorkPath() { return workPath; } @@ -210,16 +253,16 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * Set the work path for this committer. * @param workPath the work path to use. */ - protected void setWorkPath(Path workPath) { + protected final void setWorkPath(Path workPath) { LOG.debug("Setting work path to {}", workPath); this.workPath = workPath; } - public Configuration getConf() { + public final Configuration getConf() { return conf; } - protected void setConf(Configuration conf) { + protected final void setConf(Configuration conf) { this.conf = conf; } @@ -308,6 +351,24 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { */ public abstract String getName(); + /** + * The Job UUID, as passed in or generated. + * @return the UUID for the job. + */ + @VisibleForTesting + public final String getUUID() { + return uuid; + } + + /** + * Source of the UUID. + * @return how the job UUID was retrieved/generated. + */ + @VisibleForTesting + public final JobUUIDSource getUUIDSource() { + return uuidSource; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -316,6 +377,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { sb.append(", name=").append(getName()); sb.append(", outputPath=").append(getOutputPath()); sb.append(", workPath=").append(workPath); + sb.append(", uuid='").append(getUUID()).append('\''); + sb.append(", uuid source=").append(getUUIDSource()); sb.append('}'); return sb.toString(); } @@ -394,6 +457,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { // create a success data structure and then save it SuccessData successData = new SuccessData(); successData.setCommitter(getName()); + successData.setJobId(uuid); + successData.setJobIdSource(uuidSource.getText()); successData.setDescription(getRole()); successData.setHostname(NetUtils.getLocalHostname()); Date now = new Date(); @@ -411,26 +476,60 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * be deleted; creating it now ensures there is something at the end * while the job is in progress -and if nothing is created, that * it is still there. + *

+ * The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID} + * is set to the job UUID; if generated locally + * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched. + * The field {@link #jobSetup} is set to true to note that + * this specific committer instance was used to set up a job. + *

* @param context context * @throws IOException IO failure */ @Override public void setupJob(JobContext context) throws IOException { - try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) { + try (DurationInfo d = new DurationInfo(LOG, + "Job %s setting up", getUUID())) { + // record that the job has been set up + jobSetup = true; + // patch job conf with the job UUID. + Configuration c = context.getConfiguration(); + c.set(FS_S3A_COMMITTER_UUID, getUUID()); + c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText()); + Path dest = getOutputPath(); if (createJobMarker){ - commitOperations.deleteSuccessMarker(getOutputPath()); + commitOperations.deleteSuccessMarker(dest); } - getDestFS().mkdirs(getOutputPath()); + getDestFS().mkdirs(dest); + // do a scan for surplus markers + warnOnActiveUploads(dest); } } + /** + * Task setup. Fails if the the UUID was generated locally, and + * the same committer wasn't used for job setup. + * {@inheritDoc} + * @throws PathCommitException if the task UUID options are unsatisfied. + */ @Override public void setupTask(TaskAttemptContext context) throws IOException { + TaskAttemptID attemptID = context.getTaskAttemptID(); try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", - context.getTaskAttemptID())) { + attemptID)) { + // reject attempts to set up the task where the output won't be + // picked up + if (!jobSetup + && getUUIDSource() == JobUUIDSource.GeneratedLocally) { + // on anything other than a test run, the context must not have been + // generated locally. + throw new PathCommitException(getOutputPath().toString(), + "Task attempt " + attemptID + + " " + E_SELF_GENERATED_JOB_UUID); + } Path taskAttemptPath = getTaskAttemptPath(context); - FileSystem fs = getTaskAttemptFilesystem(context); + FileSystem fs = taskAttemptPath.getFileSystem(getConf()); fs.mkdirs(taskAttemptPath); } } @@ -474,12 +573,12 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { .stopOnFailure() .suppressExceptions(false) .executeWith(buildSubmitter(context)) - .abortWith(path -> - loadAndAbort(commitContext, pending, path, true, false)) - .revertWith(path -> - loadAndRevert(commitContext, pending, path)) - .run(path -> - loadAndCommit(commitContext, pending, path)); + .abortWith(status -> + loadAndAbort(commitContext, pending, status, true, false)) + .revertWith(status -> + loadAndRevert(commitContext, pending, status)) + .run(status -> + loadAndCommit(commitContext, pending, status)); } } @@ -504,7 +603,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { .stopOnFailure() .suppressExceptions(false) .executeWith(buildSubmitter(context)) - .run(path -> PendingSet.load(sourceFS, path)); + .run(status -> PendingSet.load(sourceFS, status)); } } @@ -512,17 +611,26 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * Load a pendingset file and commit all of its contents. * @param commitContext context to commit through * @param activeCommit commit state - * @param path path to load + * @param status file to load * @throws IOException failure */ private void loadAndCommit( final CommitOperations.CommitContext commitContext, final ActiveCommit activeCommit, - final Path path) throws IOException { + final FileStatus status) throws IOException { + final Path path = status.getPath(); try (DurationInfo ignored = - new DurationInfo(LOG, false, "Committing %s", path)) { - PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); + new DurationInfo(LOG, + "Loading and committing files in pendingset %s", path)) { + PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), + status); + String jobId = pendingSet.getJobId(); + if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) { + throw new PathCommitException(path, + String.format("Mismatch in Job ID (%s) and commit job ID (%s)", + getUUID(), jobId)); + } Tasks.foreach(pendingSet.getCommits()) .stopOnFailure() .suppressExceptions(false) @@ -543,17 +651,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * Load a pendingset file and revert all of its contents. * @param commitContext context to commit through * @param activeCommit commit state - * @param path path to load + * @param status status of file to load * @throws IOException failure */ private void loadAndRevert( final CommitOperations.CommitContext commitContext, final ActiveCommit activeCommit, - final Path path) throws IOException { + final FileStatus status) throws IOException { + final Path path = status.getPath(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Committing %s", path)) { - PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); + PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), + status); Tasks.foreach(pendingSet.getCommits()) .suppressExceptions(true) .run(commitContext::revertCommit); @@ -564,21 +674,22 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { * Load a pendingset file and abort all of its contents. * @param commitContext context to commit through * @param activeCommit commit state - * @param path path to load + * @param status status of file to load * @param deleteRemoteFiles should remote files be deleted? * @throws IOException failure */ private void loadAndAbort( final CommitOperations.CommitContext commitContext, final ActiveCommit activeCommit, - final Path path, + final FileStatus status, final boolean suppressExceptions, final boolean deleteRemoteFiles) throws IOException { + final Path path = status.getPath(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Aborting %s", path)) { PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), - path); + status); FileSystem fs = getDestFS(); Tasks.foreach(pendingSet.getCommits()) .executeWith(singleThreadSubmitter()) @@ -659,6 +770,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { */ protected void abortPendingUploadsInCleanup( boolean suppressExceptions) throws IOException { + // return early if aborting is disabled. + if (!shouldAbortUploadsInCleanup()) { + LOG.debug("Not cleanup up pending uploads to {} as {} is false ", + getOutputPath(), + FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS); + return; + } Path dest = getOutputPath(); try (DurationInfo ignored = new DurationInfo(LOG, "Aborting all pending commits under %s", @@ -674,14 +792,27 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { maybeIgnore(suppressExceptions, "aborting pending uploads", e); return; } - Tasks.foreach(pending) - .executeWith(buildSubmitter(getJobContext())) - .suppressExceptions(suppressExceptions) - .run(u -> commitContext.abortMultipartCommit( - u.getKey(), u.getUploadId())); + if (!pending.isEmpty()) { + LOG.warn("{} pending uploads were found -aborting", pending.size()); + LOG.warn("If other tasks/jobs are writing to {}," + + "this action may cause them to fail", dest); + Tasks.foreach(pending) + .executeWith(buildSubmitter(getJobContext())) + .suppressExceptions(suppressExceptions) + .run(u -> commitContext.abortMultipartCommit( + u.getKey(), u.getUploadId())); + } else { + LOG.info("No pending uploads were found"); + } } } + private boolean shouldAbortUploadsInCleanup() { + return getConf() + .getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, + DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS); + } + /** * Subclass-specific pre-Job-commit actions. * The staging committers all load the pending files to verify that @@ -1044,6 +1175,166 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { } } + /** + * Scan for active uploads and list them along with a warning message. + * Errors are ignored. + * @param path output path of job. + */ + protected void warnOnActiveUploads(final Path path) { + List pending; + try { + pending = getCommitOperations() + .listPendingUploadsUnderPath(path); + } catch (IOException e) { + LOG.debug("Failed to list uploads under {}", + path, e); + return; + } + if (!pending.isEmpty()) { + // log a warning + LOG.warn("{} active upload(s) in progress under {}", + pending.size(), + path); + LOG.warn("Either jobs are running concurrently" + + " or failed jobs are not being cleaned up"); + // and the paths + timestamps + DateFormat df = DateFormat.getDateTimeInstance(); + pending.forEach(u -> + LOG.info("[{}] {}", + df.format(u.getInitiated()), + u.getKey())); + if (shouldAbortUploadsInCleanup()) { + LOG.warn("This committer will abort these uploads in job cleanup"); + } + } + } + + /** + * Build the job UUID. + * + *

+ * In MapReduce jobs, the application ID is issued by YARN, and + * unique across all jobs. + *

+ *

+ * Spark will use a fake app ID based on the current time. + * This can lead to collisions on busy clusters unless + * the specific spark release has SPARK-33402 applied. + * This appends a random long value to the timestamp, so + * is unique enough that the risk of collision is almost + * nonexistent. + *

+ *

+ * The order of selection of a uuid is + *

+ *
    + *
  1. Value of + * {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.
  2. + *
  3. Value of + * {@link InternalCommitterConstants#SPARK_WRITE_UUID}.
  4. + *
  5. If enabled through + * {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}: + * Self-generated uuid.
  6. + *
  7. If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID} + * is not set: Application ID
  8. + *
+ * The UUID bonding takes place during construction; + * the staging committers use it to set up their wrapped + * committer to a path in the cluster FS which is unique to the + * job. + *

+ * In MapReduce jobs, the application ID is issued by YARN, and + * unique across all jobs. + *

+ * In {@link #setupJob(JobContext)} the job context's configuration + * will be patched + * be valid in all sequences where the job has been set up for the + * configuration passed in. + *

+ * If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID} + * is set, then an external UUID MUST be passed in. + * This can be used to verify that the spark engine is reliably setting + * unique IDs for staging. + *

+ * @param conf job/task configuration + * @param jobId job ID from YARN or spark. + * @return Job UUID and source of it. + * @throws PathCommitException no UUID was found and it was required + */ + public static Pair + buildJobUUID(Configuration conf, JobID jobId) + throws PathCommitException { + + String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, ""); + + if (!jobUUID.isEmpty()) { + return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty); + } + // there is no job UUID. + // look for one from spark + jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, ""); + if (!jobUUID.isEmpty()) { + return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID); + } + + // there is no UUID configuration in the job/task config + + // Check the job hasn't declared a requirement for the UUID. + // This allows or fail-fast validation of Spark behavior. + if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, + DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) { + throw new PathCommitException("", E_NO_SPARK_UUID); + } + + // see if the job can generate a random UUI` + if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID, + DEFAULT_S3A_COMMITTER_GENERATE_UUID)) { + // generate a random UUID. This is OK for a job, for a task + // it means that the data may not get picked up. + String newId = UUID.randomUUID().toString(); + LOG.warn("No job ID in configuration; generating a random ID: {}", + newId); + return Pair.of(newId, JobUUIDSource.GeneratedLocally); + } + // if no other option was supplied, return the job ID. + // This is exactly what MR jobs expect, but is not what + // Spark jobs can do as there is a risk of jobID collision. + return Pair.of(jobId.toString(), JobUUIDSource.JobID); + } + + /** + * Enumeration of Job UUID source. + */ + public enum JobUUIDSource { + SparkWriteUUID(SPARK_WRITE_UUID), + CommitterUUIDProperty(FS_S3A_COMMITTER_UUID), + JobID("JobID"), + GeneratedLocally("Generated Locally"); + + private final String text; + + JobUUIDSource(final String text) { + this.text = text; + } + + /** + * Source for messages. + * @return text + */ + public String getText() { + return text; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "JobUUIDSource{"); + sb.append("text='").append(text).append('\''); + sb.append('}'); + return sb.toString(); + } + } + /** * State of the active commit operation. * @@ -1071,7 +1362,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { = new ActiveCommit(null, new ArrayList<>()); /** All pendingset files to iterate through. */ - private final List sourceFiles; + private final List sourceFiles; /** * Filesystem for the source files. @@ -1101,8 +1392,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { */ public ActiveCommit( final FileSystem sourceFS, - final List sourceFiles) { - this.sourceFiles = sourceFiles; + final List sourceFiles) { + this.sourceFiles = (List) sourceFiles; this.sourceFS = sourceFS; } @@ -1115,10 +1406,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { public static ActiveCommit fromStatusList( final FileSystem pendingFS, final List statuses) { - return new ActiveCommit(pendingFS, - statuses.stream() - .map(FileStatus::getPath) - .collect(Collectors.toList())); + return new ActiveCommit(pendingFS, statuses); } /** @@ -1129,7 +1417,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { return EMPTY; } - public List getSourceFiles() { + public List getSourceFiles() { return sourceFiles; } @@ -1174,8 +1462,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter { return sourceFiles.isEmpty(); } - public void add(Path path) { - sourceFiles.add(path); + public void add(FileStatus status) { + sourceFiles.add(status); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index e7c049226d7..3224a5ab36d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -240,20 +240,39 @@ public final class CommitConstants { /** - * Should the staging committers abort all pending uploads to the destination - * directory? Default: true. - * - * Changing this is if more than one partitioned committer is + * Should committers abort all pending uploads to the destination + * directory? + *

+ * Deprecated: switch to {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}. + */ + @Deprecated + public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS = + "fs.s3a.committer.staging.abort.pending.uploads"; + + /** + * Should committers abort all pending uploads to the destination + * directory? + *

+ * Value: {@value}. + *

+ * Change this is if more than one committer is * writing to the same destination tree simultaneously; otherwise * the first job to complete will cancel all outstanding uploads from the - * others. However, it may lead to leaked outstanding uploads from failed - * tasks. If disabled, configure the bucket lifecycle to remove uploads + * others. If disabled, configure the bucket lifecycle to remove uploads * after a time period, and/or set up a workflow to explicitly delete * entries. Otherwise there is a risk that uncommitted uploads may run up * bills. */ - public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS = - "fs.s3a.committer.staging.abort.pending.uploads"; + public static final String FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS = + "fs.s3a.committer.abort.pending.uploads"; + + /** + * Default configuration value for + * {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}. + * Value: {@value}. + */ + public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS = + true; /** * The limit to the number of committed objects tracked during @@ -264,4 +283,37 @@ public final class CommitConstants { /** Extra Data key for task attempt in pendingset files. */ public static final String TASK_ATTEMPT_ID = "task.attempt.id"; + /** + * Require the spark UUID to be passed down: {@value}. + * This is to verify that SPARK-33230 has been applied to spark, and that + * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is set. + *

+ * MUST ONLY BE SET WITH SPARK JOBS. + *

+ */ + public static final String FS_S3A_COMMITTER_REQUIRE_UUID = + "fs.s3a.committer.require.uuid"; + + /** + * Default value for {@link #FS_S3A_COMMITTER_REQUIRE_UUID}: {@value}. + */ + public static final boolean DEFAULT_S3A_COMMITTER_REQUIRE_UUID = + false; + + /** + * Generate a UUID in job setup rather than fall back to + * YARN Application attempt ID. + *

+ * MUST ONLY BE SET WITH SPARK JOBS. + *

+ */ + public static final String FS_S3A_COMMITTER_GENERATE_UUID = + "fs.s3a.committer.generate.uuid"; + + /** + * Default value for {@link #FS_S3A_COMMITTER_GENERATE_UUID}: {@value}. + */ + public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID = + false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index 155e86a3d46..a558b09f8f3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -159,7 +159,11 @@ public class CommitOperations { LOG.debug("Committing single commit {}", commit); MaybeIOE outcome; String destKey = "unknown destination"; - try { + try (DurationInfo d = new DurationInfo(LOG, + "Committing file %s size %s", + commit.getDestinationKey(), + commit.getLength())) { + commit.validate(); destKey = commit.getDestinationKey(); long l = innerCommit(commit, operationState); @@ -273,7 +277,7 @@ public class CommitOperations { ? (" defined in " + commit.getFilename()) : ""; String uploadId = commit.getUploadId(); - LOG.info("Aborting commit to object {}{}", destKey, origin); + LOG.info("Aborting commit ID {} to object {}{}", uploadId, destKey, origin); abortMultipartCommit(destKey, uploadId); } @@ -287,7 +291,8 @@ public class CommitOperations { */ private void abortMultipartCommit(String destKey, String uploadId) throws IOException { - try { + try (DurationInfo d = new DurationInfo(LOG, + "Aborting commit ID %s to path %s", uploadId, destKey)) { writeOperations.abortMultipartCommit(destKey, uploadId); } finally { statistics.commitAborted(); @@ -462,7 +467,11 @@ public class CommitOperations { String uploadId = null; boolean threw = true; - try { + try (DurationInfo d = new DurationInfo(LOG, + "Upload staged file from %s to %s", + localFile.getAbsolutePath(), + destPath)) { + statistics.commitCreated(); uploadId = writeOperations.initiateMultiPartUpload(destKey); long length = localFile.length(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java index c6c0da8309c..9e5ee860e85 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java @@ -58,7 +58,7 @@ public final class CommitUtilsWithMR { /** * Get the Application Attempt ID for this job. * @param context the context to look in - * @return the Application Attempt ID for a given job. + * @return the Application Attempt ID for a given job, or 0 */ public static int getAppAttemptId(JobContext context) { return context.getConfiguration().getInt( @@ -67,33 +67,32 @@ public final class CommitUtilsWithMR { /** * Compute the "magic" path for a job attempt. - * @param appAttemptId the ID of the application attempt for this job. + * @param jobUUID unique Job ID. * @param dest the final output directory * @return the path to store job attempt data. */ - public static Path getMagicJobAttemptPath(int appAttemptId, Path dest) { + public static Path getMagicJobAttemptPath(String jobUUID, Path dest) { return new Path(getMagicJobAttemptsPath(dest), - formatAppAttemptDir(appAttemptId)); + formatAppAttemptDir(jobUUID)); } /** * Format the application attempt directory. - * @param attemptId attempt ID + * @param jobUUID unique Job ID. * @return the directory name for the application attempt */ - public static String formatAppAttemptDir(int attemptId) { - return String.format("app-attempt-%04d", attemptId); + public static String formatAppAttemptDir(String jobUUID) { + return String.format("job-%s", jobUUID); } /** * Compute the path where the output of magic task attempts are stored. - * @param context the context of the job with magic tasks. + * @param jobUUID unique Job ID. * @param dest destination of work * @return the path where the output of magic task attempts are stored. */ - public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) { - return new Path(getMagicJobAttemptPath( - getAppAttemptId(context), dest), "tasks"); + public static Path getMagicTaskAttemptsPath(String jobUUID, Path dest) { + return new Path(getMagicJobAttemptPath(jobUUID, dest), "tasks"); } /** @@ -102,48 +101,56 @@ public final class CommitUtilsWithMR { * This path is marked as a base path for relocations, so subdirectory * information is preserved. * @param context the context of the task attempt. + * @param jobUUID unique Job ID. * @param dest The output path to commit work into * @return the path where a task attempt should be stored. */ public static Path getMagicTaskAttemptPath(TaskAttemptContext context, + String jobUUID, Path dest) { - return new Path(getBaseMagicTaskAttemptPath(context, dest), BASE); + return new Path(getBaseMagicTaskAttemptPath(context, jobUUID, dest), + BASE); } /** * Get the base Magic attempt path, without any annotations to mark relative * references. * @param context task context. + * @param jobUUID unique Job ID. * @param dest The output path to commit work into * @return the path under which all attempts go */ public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context, + String jobUUID, Path dest) { - return new Path(getMagicTaskAttemptsPath(context, dest), + return new Path(getMagicTaskAttemptsPath(jobUUID, dest), String.valueOf(context.getTaskAttemptID())); } /** * Compute a path for temporary data associated with a job. * This data is not magic - * @param appAttemptId the ID of the application attempt for this job. + * @param jobUUID unique Job ID. * @param out output directory of job * @return the path to store temporary job attempt data. */ - public static Path getTempJobAttemptPath(int appAttemptId, Path out) { + public static Path getTempJobAttemptPath(String jobUUID, + Path out) { return new Path(new Path(out, TEMP_DATA), - formatAppAttemptDir(appAttemptId)); + formatAppAttemptDir(jobUUID)); } /** - * Compute the path where the output of a given job attempt will be placed. + * Compute the path where the output of a given task attempt will be placed. * @param context task context + * @param jobUUID unique Job ID. * @param out output directory of job * @return the path to store temporary job attempt data. */ public static Path getTempTaskAttemptPath(TaskAttemptContext context, - Path out) { - return new Path(getTempJobAttemptPath(getAppAttemptId(context), out), + final String jobUUID, Path out) { + return new Path( + getTempJobAttemptPath(jobUUID, out), String.valueOf(context.getTaskAttemptID())); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java index 2821fcea490..461c9a5e646 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java @@ -46,8 +46,14 @@ public final class InternalCommitterConstants { /** * A unique identifier to use for this work: {@value}. */ - public static final String FS_S3A_COMMITTER_STAGING_UUID = - "fs.s3a.committer.staging.uuid"; + public static final String FS_S3A_COMMITTER_UUID = + "fs.s3a.committer.uuid"; + + /** + * Where did the UUID come from? {@value}. + */ + public static final String FS_S3A_COMMITTER_UUID_SOURCE = + "fs.s3a.committer.uuid.source"; /** * Directory committer factory: {@value}. @@ -97,4 +103,25 @@ public final class InternalCommitterConstants { /** Error message for a path without a magic element in the list: {@value}. */ public static final String E_NO_MAGIC_PATH_ELEMENT = "No " + MAGIC + " element in path"; + + /** + * The UUID for jobs: {@value}. + * This was historically created in Spark 1.x's SQL queries, but "went away". + */ + public static final String SPARK_WRITE_UUID = + "spark.sql.sources.writeJobUUID"; + + /** + * Java temp dir: {@value}. + */ + public static final String JAVA_IO_TMPDIR = "java.io.tmpdir"; + + /** + * Incoming Job/task configuration didn't contain any option + * {@link #SPARK_WRITE_UUID}. + */ + public static final String E_NO_SPARK_UUID = + "Job/task context does not contain a unique ID in " + + SPARK_WRITE_UUID; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java index 4793b78e63f..8ad03421058 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.ValidationFailure; @@ -56,7 +57,7 @@ public class PendingSet extends PersistentCommitData { * If this is changed the value of {@link #serialVersionUID} will change, * to avoid deserialization problems. */ - public static final int VERSION = 1; + public static final int VERSION = 2; /** * Serialization ID: {@value}. @@ -67,6 +68,9 @@ public class PendingSet extends PersistentCommitData { /** Version marker. */ private int version = VERSION; + /** Job ID, if known. */ + private String jobId = ""; + /** * Commit list. */ @@ -110,6 +114,19 @@ public class PendingSet extends PersistentCommitData { return instance; } + /** + * Load an instance from a file, then validate it. + * @param fs filesystem + * @param status status of file to load + * @return the loaded instance + * @throws IOException IO failure + * @throws ValidationFailure if the data is invalid + */ + public static PendingSet load(FileSystem fs, FileStatus status) + throws IOException { + return load(fs, status.getPath()); + } + /** * Add a commit. * @param commit the single commit @@ -198,4 +215,14 @@ public class PendingSet extends PersistentCommitData { public void putExtraData(String key, String value) { extraData.put(key, value); } + + /** @return Job ID, if known. */ + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java index cc27d079075..dba44b9a011 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java @@ -40,7 +40,7 @@ public abstract class PersistentCommitData implements Serializable { * If this is changed the value of {@code serialVersionUID} will change, * to avoid deserialization problems. */ - public static final int VERSION = 1; + public static final int VERSION = 2; /** * Validate the data: those fields which must be non empty, must be set. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java index c848f80b02d..f97f243839d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java @@ -207,7 +207,7 @@ public class SinglePendingCommit extends PersistentCommitData @Override public String toString() { final StringBuilder sb = new StringBuilder( - "DelayedCompleteData{"); + "SinglePendingCommit{"); sb.append("version=").append(version); sb.append(", uri='").append(uri).append('\''); sb.append(", destination='").append(destinationKey).append('\''); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java index e0273fa11a5..b7509d6714b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java @@ -68,7 +68,7 @@ public class SuccessData extends PersistentCommitData { /** * Serialization ID: {@value}. */ - private static final long serialVersionUID = 507133045258460084L; + private static final long serialVersionUID = 507133045258460083L + VERSION; /** * Name to include in persisted data, so as to differentiate from @@ -103,6 +103,14 @@ public class SuccessData extends PersistentCommitData { */ private String description; + /** Job ID, if known. */ + private String jobId = ""; + + /** + * Source of the job ID. + */ + private String jobIdSource = ""; + /** * Metrics. */ @@ -325,4 +333,21 @@ public class SuccessData extends PersistentCommitData { public void addDiagnostic(String key, String value) { diagnostics.put(key, value); } + + /** @return Job ID, if known. */ + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getJobIdSource() { + return jobIdSource; + } + + public void setJobIdSource(final String jobIdSource) { + this.jobIdSource = jobIdSource; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 30417ead353..b330cee4e6b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -97,6 +97,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { public void setupJob(JobContext context) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "Setup Job %s", jobIdString(context))) { + super.setupJob(context); Path jobAttemptPath = getJobAttemptPath(context); getDestinationFS(jobAttemptPath, context.getConfiguration()).mkdirs(jobAttemptPath); @@ -131,16 +132,6 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { } } - @Override - public void setupTask(TaskAttemptContext context) throws IOException { - try (DurationInfo d = new DurationInfo(LOG, - "Setup Task %s", context.getTaskAttemptID())) { - Path taskAttemptPath = getTaskAttemptPath(context); - FileSystem fs = taskAttemptPath.getFileSystem(getConf()); - fs.mkdirs(taskAttemptPath); - } - } - /** * Did this task write any files in the work directory? * Probes for a task existing by looking to see if the attempt dir exists. @@ -208,13 +199,14 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { throw failures.get(0).getValue(); } // patch in IDs - String jobId = String.valueOf(context.getJobID()); + String jobId = getUUID(); String taskId = String.valueOf(context.getTaskAttemptID()); for (SinglePendingCommit commit : pendingSet.getCommits()) { commit.setJobId(jobId); commit.setTaskId(taskId); } pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); + pendingSet.setJobId(jobId); Path jobAttemptPath = getJobAttemptPath(context); TaskAttemptID taskAttemptID = context.getTaskAttemptID(); Path taskOutcomePath = new Path(jobAttemptPath, @@ -259,11 +251,12 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { /** * Compute the path where the output of a given job attempt will be placed. + * For the magic committer, the path includes the job UUID. * @param appAttemptId the ID of the application attempt for this job. * @return the path to store job attempt data. */ protected Path getJobAttemptPath(int appAttemptId) { - return getMagicJobAttemptPath(appAttemptId, getOutputPath()); + return getMagicJobAttemptPath(getUUID(), getOutputPath()); } /** @@ -274,12 +267,12 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { * @return the path where a task attempt should be stored. */ public Path getTaskAttemptPath(TaskAttemptContext context) { - return getMagicTaskAttemptPath(context, getOutputPath()); + return getMagicTaskAttemptPath(context, getUUID(), getOutputPath()); } @Override protected Path getBaseTaskAttemptPath(TaskAttemptContext context) { - return getBaseMagicTaskAttemptPath(context, getOutputPath()); + return getBaseMagicTaskAttemptPath(context, getUUID(), getOutputPath()); } /** @@ -289,13 +282,16 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { * @return a path for temporary data. */ public Path getTempTaskAttemptPath(TaskAttemptContext context) { - return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath()); + return CommitUtilsWithMR.getTempTaskAttemptPath(context, + getUUID(), + getOutputPath()); } @Override public String toString() { final StringBuilder sb = new StringBuilder( "MagicCommitter{"); + sb.append(super.toString()); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java index 7be54062d28..214c7abdc73 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -198,8 +198,9 @@ public class PartitionedStagingCommitter extends StagingCommitter { .stopOnFailure() .suppressExceptions(false) .executeWith(submitter) - .run(path -> { - PendingSet pendingSet = PendingSet.load(sourceFS, path); + .run(status -> { + PendingSet pendingSet = PendingSet.load(sourceFS, + status); Path lastParent = null; for (SinglePendingCommit commit : pendingSet.getCommits()) { Path parent = commit.destinationPath().getParent(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java index a941572f1e7..ceb03a3901f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.UserGroupInformation; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JAVA_IO_TMPDIR; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; /** @@ -141,14 +142,18 @@ public final class Paths { } /** - * A cache of temporary folders. There's a risk here that the cache - * gets too big + * A cache of temporary folders, using a generated ID which must be unique for + * each active task attempt. */ - private static Cache tempFolders = CacheBuilder + private static Cache tempFolders = CacheBuilder .newBuilder().build(); /** * Get the task attempt temporary directory in the local filesystem. + * This must be unique to all tasks on all jobs running on all processes + * on this host. + * It's constructed as uuid+task-attempt-ID, relying on UUID to be unique + * for each job. * @param conf configuration * @param uuid some UUID, such as a job UUID * @param attemptID attempt ID @@ -162,10 +167,11 @@ public final class Paths { try { final LocalDirAllocator allocator = new LocalDirAllocator(Constants.BUFFER_DIR); - return tempFolders.get(attemptID, + String name = uuid + "-" + attemptID; + return tempFolders.get(name, () -> { return FileSystem.getLocal(conf).makeQualified( - allocator.getLocalPathForWrite(uuid, conf)); + allocator.getLocalPathForWrite(name, conf)); }); } catch (ExecutionException | UncheckedExecutionException e) { Throwable cause = e.getCause(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 9cc932b1ea8..53f811ff9e1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -26,7 +26,6 @@ import java.util.Locale; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,6 @@ import org.apache.hadoop.fs.s3a.commit.Tasks; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.DurationInfo; @@ -55,7 +53,6 @@ import static com.google.common.base.Preconditions.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.Invoker.*; -import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; @@ -95,7 +92,6 @@ public class StagingCommitter extends AbstractS3ACommitter { public static final String NAME = "staging"; private final Path constructorOutputPath; private final long uploadPartSize; - private final String uuid; private final boolean uniqueFilenames; private final FileOutputCommitter wrappedCommitter; @@ -118,15 +114,14 @@ public class StagingCommitter extends AbstractS3ACommitter { Configuration conf = getConf(); this.uploadPartSize = conf.getLongBytes( MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - this.uuid = getUploadUUID(conf, context.getJobID()); this.uniqueFilenames = conf.getBoolean( FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); - setWorkPath(buildWorkPath(context, uuid)); + setWorkPath(buildWorkPath(context, getUUID())); this.wrappedCommitter = createWrappedCommitter(context, conf); setOutputPath(constructorOutputPath); Path finalOutputPath = getOutputPath(); - Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null"); + checkNotNull(finalOutputPath, "Output path cannot be null"); S3AFileSystem fs = getS3AFileSystem(finalOutputPath, context.getConfiguration(), false); s3KeyPrefix = fs.pathToKey(finalOutputPath); @@ -156,7 +151,8 @@ public class StagingCommitter extends AbstractS3ACommitter { // explicitly choose commit algorithm initFileOutputCommitterOptions(context); - commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid); + commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, + getUUID()); return new FileOutputCommitter(commitsDirectory, context); } @@ -175,7 +171,10 @@ public class StagingCommitter extends AbstractS3ACommitter { public String toString() { final StringBuilder sb = new StringBuilder("StagingCommitter{"); sb.append(super.toString()); + sb.append(", commitsDirectory=").append(commitsDirectory); + sb.append(", uniqueFilenames=").append(uniqueFilenames); sb.append(", conflictResolution=").append(conflictResolution); + sb.append(", uploadPartSize=").append(uploadPartSize); if (wrappedCommitter != null) { sb.append(", wrappedCommitter=").append(wrappedCommitter); } @@ -183,40 +182,6 @@ public class StagingCommitter extends AbstractS3ACommitter { return sb.toString(); } - /** - * Get the UUID of an upload; may be the job ID. - * Spark will use a fake app ID based on the current minute and job ID 0. - * To avoid collisions, the key policy is: - *
    - *
  1. Value of {@link InternalCommitterConstants#FS_S3A_COMMITTER_STAGING_UUID}.
  2. - *
  3. Value of {@code "spark.sql.sources.writeJobUUID"}.
  4. - *
  5. Value of {@code "spark.app.id"}.
  6. - *
  7. JobId passed in.
  8. - *
- * The staging UUID is set in in {@link #setupJob(JobContext)} and so will - * be valid in all sequences where the job has been set up for the - * configuration passed in. - * @param conf job/task configuration - * @param jobId Job ID - * @return an ID for use in paths. - */ - public static String getUploadUUID(Configuration conf, String jobId) { - return conf.getTrimmed( - InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, - conf.getTrimmed(SPARK_WRITE_UUID, - conf.getTrimmed(SPARK_APP_ID, jobId))); - } - - /** - * Get the UUID of a Job. - * @param conf job/task configuration - * @param jobId Job ID - * @return an ID for use in paths. - */ - public static String getUploadUUID(Configuration conf, JobID jobId) { - return getUploadUUID(conf, jobId.toString()); - } - /** * Get the work path for a task. * @param context job/task complex @@ -309,7 +274,7 @@ public class StagingCommitter extends AbstractS3ACommitter { * @return the location of pending job attempts. */ private static Path getPendingJobAttemptsPath(Path out) { - Preconditions.checkNotNull(out, "Null 'out' path"); + checkNotNull(out, "Null 'out' path"); return new Path(out, TEMPORARY); } @@ -330,12 +295,12 @@ public class StagingCommitter extends AbstractS3ACommitter { * @param context task context */ private static void validateContext(TaskAttemptContext context) { - Preconditions.checkNotNull(context, "null context"); - Preconditions.checkNotNull(context.getTaskAttemptID(), + checkNotNull(context, "null context"); + checkNotNull(context.getTaskAttemptID(), "null task attempt ID"); - Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(), + checkNotNull(context.getTaskAttemptID().getTaskID(), "null task ID"); - Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(), + checkNotNull(context.getTaskAttemptID().getJobID(), "null job ID"); } @@ -377,7 +342,7 @@ public class StagingCommitter extends AbstractS3ACommitter { // get files on the local FS in the attempt path Path attemptPath = getTaskAttemptPath(context); - Preconditions.checkNotNull(attemptPath, + checkNotNull(attemptPath, "No attemptPath path in {}", this); LOG.debug("Scanning {} for files to commit", attemptPath); @@ -401,7 +366,8 @@ public class StagingCommitter extends AbstractS3ACommitter { */ protected String getFinalKey(String relative, JobContext context) { if (uniqueFilenames) { - return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid); + return getS3KeyPrefix(context) + "/" + + Paths.addUUID(relative, getUUID()); } else { return getS3KeyPrefix(context) + "/" + relative; } @@ -452,11 +418,8 @@ public class StagingCommitter extends AbstractS3ACommitter { */ @Override public void setupJob(JobContext context) throws IOException { - LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context)); - context.getConfiguration().set( - InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid); - wrappedCommitter.setupJob(context); super.setupJob(context); + wrappedCommitter.setupJob(context); } /** @@ -539,19 +502,6 @@ public class StagingCommitter extends AbstractS3ACommitter { super.cleanup(context, suppressExceptions); } - @Override - protected void abortPendingUploadsInCleanup(boolean suppressExceptions) - throws IOException { - if (getConf() - .getBoolean(FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, true)) { - super.abortPendingUploadsInCleanup(suppressExceptions); - } else { - LOG.info("Not cleanup up pending uploads to {} as {} is false ", - getOutputPath(), - FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS); - } - } - @Override protected void abortJobInternal(JobContext context, boolean suppressExceptions) throws IOException { @@ -608,8 +558,7 @@ public class StagingCommitter extends AbstractS3ACommitter { Path taskAttemptPath = getTaskAttemptPath(context); try (DurationInfo d = new DurationInfo(LOG, "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) { - // create the local FS - taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath); + super.setupTask(context); wrappedCommitter.setupTask(context); } } @@ -832,15 +781,6 @@ public class StagingCommitter extends AbstractS3ACommitter { return s3KeyPrefix; } - /** - * A UUID for this upload, as calculated with. - * {@link #getUploadUUID(Configuration, String)} - * @return the UUID for files - */ - protected String getUUID() { - return uuid; - } - /** * Returns the {@link ConflictResolution} mode for this commit. * diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java index c41715bd497..ee2b9eca501 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java @@ -46,19 +46,4 @@ public final class StagingCommitterConstants { */ public static final String STAGING_UPLOADS = "staging-uploads"; - // Spark configuration keys - - /** - * The UUID for jobs: {@value}. - */ - public static final String SPARK_WRITE_UUID = - "spark.sql.sources.writeJobUUID"; - - /** - * The App ID for jobs. - */ - - public static final String SPARK_APP_ID = "spark.app.id"; - - public static final String JAVA_IO_TMPDIR = "java.io.tmpdir"; } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index dee77b62e6a..0a65786aa69 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -530,26 +530,33 @@ Amazon S3, that means S3Guard must *always* be enabled. Conflict management is left to the execution engine itself. -## Committer Configuration Options +## Common Committer Options -| Option | Magic | Directory | Partitioned | Meaning | Default | -|--------|-------|-----------|-------------|---------|---------| -| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file at the end of each job | `true` | -| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 | -| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` | -| `fs.s3a.committer.staging.unique-filenames` | | X | X | Generate unique filenames | `true` | -| `fs.s3a.committer.magic.enabled` | X | | | Enable "magic committer" support in the filesystem | `false` | +| Option | Meaning | Default | +|--------|---------|---------| +| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Write a `_SUCCESS` file on the successful completion of the job. | `true` | +| `fs.s3a.buffer.dir` | Local filesystem directory for data being written and/or staged. | `${hadoop.tmp.dir}/s3a` | +| `fs.s3a.committer.magic.enabled` | Enable "magic committer" support in the filesystem. | `false` | +| `fs.s3a.committer.abort.pending.uploads` | list and abort all pending uploads under the destination path when the job is committed or aborted. | `true` | +| `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files. | 8 | +| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` | +| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` | +## Staging committer (Directory and Partitioned) options -| Option | Magic | Directory | Partitioned | Meaning | Default | -|--------|-------|-----------|-------------|---------|---------| -| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being written and/or staged. | | -| `fs.s3a.committer.staging.tmp.path` | | X | X | Path in the cluster filesystem for temporary data | `tmp/staging` | +| Option | Meaning | Default | +|--------|---------|---------| +| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` | +| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` | +| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` | +| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` | +### Common Committer Options + ```xml fs.s3a.committer.name @@ -579,6 +586,60 @@ Conflict management is left to the execution engine itself. + + fs.s3a.committer.abort.pending.uploads + true + + Should the committers abort all pending uploads to the destination + directory? + + Set to false if more than one job is writing to the same directory tree. + Was: "fs.s3a.committer.staging.abort.pending.uploads" when only used + by the staging committers. + + + + + mapreduce.outputcommitter.factory.scheme.s3a + org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory + + The committer factory to use when writing data to S3A filesystems. + If mapreduce.outputcommitter.factory.class is set, it will + override this property. + + (This property is set in mapred-default.xml) + + + + + fs.s3a.committer.require.uuid + false + + Should the committer fail to initialize if a unique ID isn't set in + "spark.sql.sources.writeJobUUID" or fs.s3a.committer.staging.uuid + This helps guarantee that unique IDs for jobs are being + passed down in spark applications. + + Setting this option outside of spark will stop the S3A committer + in job setup. In MapReduce workloads the job attempt ID is unique + and so no unique ID need be passed down. + + + + + fs.s3a.committer.generate.uuid + false + + Generate a Job UUID if none is passed down from Spark. + This uuid is only generated if the fs.s3a.committer.require.uuid flag + is false. + + +``` + +### Staging Committer Options + +```xml fs.s3a.committer.staging.tmp.path tmp/staging @@ -613,38 +674,45 @@ Conflict management is left to the execution engine itself. - - s.s3a.committer.staging.abort.pending.uploads - true - - Should the staging committers abort all pending uploads to the destination - directory? - - Changing this if more than one partitioned committer is - writing to the same destination tree simultaneously; otherwise - the first job to complete will cancel all outstanding uploads from the - others. However, it may lead to leaked outstanding uploads from failed - tasks. If disabled, configure the bucket lifecycle to remove uploads - after a time period, and/or set up a workflow to explicitly delete - entries. Otherwise there is a risk that uncommitted uploads may run up - bills. - - - - - mapreduce.outputcommitter.factory.scheme.s3a - org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory - - The committer factory to use when writing data to S3A filesystems. - If mapreduce.outputcommitter.factory.class is set, it will - override this property. - - (This property is set in mapred-default.xml) - - ``` +## Concurrent Jobs writing to the same destination + +It is sometimes possible for multiple jobs to simultaneously write to the same destination path. + +Before attempting this, the committers must be set to not delete all incomplete uploads on job commit, +by setting `fs.s3a.committer.abort.pending.uploads` to `false` + +```xml + + fs.s3a.committer.abort.pending.uploads + false + +``` + +If more than one job is writing to the same destination path then every task MUST +be creating files with paths/filenames unique to the specific job. +It is not enough for them to be unique by task `part-00000.snappy.parquet`, +because each job will have tasks with the same name, so generate files with conflicting operations. + +For the staging committers, setting `fs.s3a.committer.staging.unique-filenames` to ensure unique names are +generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`. + +Note: by default, the option `mapreduce.output.basename` sets the base name for files; +changing that from the default `part` value to something unique for each job may achieve this. + +For example, for any job executed through Hadoop MapReduce, the Job ID can be used in the filename. + +```xml + + mapreduce.output.basename + part-${mapreduce.job.id} + +``` + +Even with these settings, the outcome of concurrent jobs to the same destination is +inherently nondeterministic -use with caution. ## Troubleshooting @@ -700,7 +768,7 @@ Delegation token support is disabled Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds ``` -## Error message: "File being created has a magic path, but the filesystem has magic file support disabled: +### Error message: "File being created has a magic path, but the filesystem has magic file support disabled" A file is being written to a path which is used for "magic" files, files which are actually written to a different destination than their stated path @@ -781,7 +849,7 @@ If you have subclassed `FileOutputCommitter` and want to move to the factory model, please get in touch. -## Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail" +### Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail" This surfaces when either of two conditions are met. @@ -795,7 +863,7 @@ during task commit, which will cause the entire job to fail. If you are trying to write data and want write conflicts to be rejected, this is the correct behavior: there was data at the destination so the job was aborted. -## Staging committer task fails with IOException: No space left on device +### Staging committer task fails with IOException: No space left on device There's not enough space on the local hard disk (real or virtual) to store all the uncommitted data of the active tasks on that host. @@ -821,3 +889,169 @@ generating less data each. 1. Use the magic committer. This only needs enough disk storage to buffer blocks of the currently being written file during their upload process, so can use a lot less disk space. + +### Jobs run with directory/partitioned committers complete but the output is empty. + +Make sure that `fs.s3a.committer.staging.tmp.path` is set to a path on the shared cluster +filesystem (usually HDFS). It MUST NOT be set to a local directory, as then the job committer, +running on a different host *will not see the lists of pending uploads to commit*. + +### Magic output committer task fails "The specified upload does not exist" "Error Code: NoSuchUpload" + +The magic committer is being used and a task writing data to the S3 store fails +with an error message about the upload not existing. + +``` +java.io.FileNotFoundException: upload part #1 upload + YWHTRqBaxlsutujKYS3eZHfdp6INCNXbk0JVtydX_qzL5fZcoznxRbbBZRfswOjomddy3ghRyguOqywJTfGG1Eq6wOW2gitP4fqWrBYMroasAygkmXNYF7XmUyFHYzja + on test/ITestMagicCommitProtocol-testParallelJobsToSameDestPaths/part-m-00000: + com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not + exist. The upload ID may be invalid, or the upload may have been aborted or + completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; + Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID: + cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=), + S3 Extended Request ID: + cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=:NoSuchUpload + + at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112) + at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315) + at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407) + at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311) + at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:286) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:154) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:590) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.lambda$uploadBlockAsync$0(S3ABlockOutputStream.java:652) + +Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: + The specified upload does not exist. + The upload ID may be invalid, or the upload may have been aborted or completed. + (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID: + cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=), + S3 Extended Request ID: cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E= + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) + at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866) + at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3715) + at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3700) + at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:2343) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:594) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110) + ... 15 more +``` + +The block write failed because the previously created upload was aborted before the data could be written. + +Causes + +1. Another job has written to the same directory tree with an S3A committer + -and when that job was committed, all incomplete uploads were aborted. +1. The `hadoop s3guard uploads --abort` command has being called on/above the directory. +1. Some other program is cancelling uploads to that bucket/path under it. +1. The job is lasting over 24h and a bucket lifecycle policy is aborting the uploads. + +The `_SUCCESS` file from the previous job may provide diagnostics. + +If the cause is Concurrent Jobs, see [Concurrent Jobs writing to the same destination](#concurrent-jobs). + +### Job commit fails "java.io.FileNotFoundException: Completing multipart upload" "The specified upload does not exist" + +The job commit fails with an error about the specified upload not existing. + +``` +java.io.FileNotFoundException: Completing multipart upload on + test/DELAY_LISTING_ME/ITestDirectoryCommitProtocol-testParallelJobsToSameDestPaths/part-m-00001: + com.amazonaws.services.s3.model.AmazonS3Exception: + The specified upload does not exist. + The upload ID may be invalid, or the upload may have been aborted or completed. + (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; + Request ID: 8E6173241D2970CB; S3 Extended Request ID: + Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=), + S3 Extended Request ID: + Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=:NoSuchUpload + + at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112) + at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315) + at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407) + at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:261) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.commitUpload(WriteOperationHelper.java:549) + at org.apache.hadoop.fs.s3a.commit.CommitOperations.innerCommit(CommitOperations.java:199) + at org.apache.hadoop.fs.s3a.commit.CommitOperations.commit(CommitOperations.java:168) + at org.apache.hadoop.fs.s3a.commit.CommitOperations.commitOrFail(CommitOperations.java:144) + at org.apache.hadoop.fs.s3a.commit.CommitOperations.access$100(CommitOperations.java:74) + at org.apache.hadoop.fs.s3a.commit.CommitOperations$CommitContext.commitOrFail(CommitOperations.java:612) + at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$loadAndCommit$5(AbstractS3ACommitter.java:535) + at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.runSingleThreaded(Tasks.java:164) + at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.run(Tasks.java:149) + at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.loadAndCommit(AbstractS3ACommitter.java:534) + at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$commitPendingUploads$2(AbstractS3ACommitter.java:482) + at org.apache.hadoop.fs.s3a.commit.Tasks$Builder$1.run(Tasks.java:253) + +Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. + The upload ID may be invalid, or the upload may have been aborted or completed. + (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 8E6173241D2970CB; + S3 Extended Request ID: Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=), + + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) + at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) + at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866) + at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3464) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:267) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110) +``` + +The problem is likely to be that of the previous one: concurrent jobs are writing the same output directory, +or another program has cancelled all pending uploads. + +See [Concurrent Jobs writing to the same destination](#concurrent-jobs). + +### Job commit fails `java.io.FileNotFoundException` "File hdfs://.../staging-uploads/_temporary/0 does not exist" + +The Staging committer will fail in job commit if the intermediate directory on the cluster FS is missing during job commit. + +This is possible if another job used the same staging upload directory and, + after committing its work, it deleted the directory. + +A unique Job ID is required for each spark job run by a specific user. +Spark generates job IDs for its committers using the current timestamp, +and if two jobs/stages are started in the same second, they will have the same job ID. + +See [SPARK-33230](https://issues.apache.org/jira/browse/SPARK-33230). + +This is fixed in all spark releases which have the patch applied. + +You can set the property `fs.s3a.committer.staging.require.uuid` to fail +the staging committers fast if a unique Job ID isn't found in +`spark.sql.sources.writeJobUUID`. + +### Job setup fails `Job/task context does not contain a unique ID in spark.sql.sources.writeJobUUID` + +This will surface in job setup if the option `fs.s3a.committer.require.uuid` is `true`, and +one of the following conditions are met + +1. The committer is being used in a Hadoop MapReduce job, whose job attempt ID is unique + -there is no need to add this requirement. + Fix: unset `fs.s3a.committer.require.uuid`. +1. The committer is being used in spark, and the version of spark being used does not + set the `spark.sql.sources.writeJobUUID` property. + Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index c611ad16036..3c5dfce3bdf 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -248,6 +248,57 @@ As an example, the endpoint for S3 Frankfurt is `s3.eu-central-1.amazonaws.com`: ``` +### `Class does not implement AWSCredentialsProvider` + +A credential provider listed in `fs.s3a.aws.credentials.provider` does not implement +the interface `com.amazonaws.auth.AWSCredentialsProvider`. + +``` + Cause: java.lang.RuntimeException: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider + at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:686) + at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:621) + at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:219) + at org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:126) + at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) + at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) + at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) + at java.lang.reflect.Constructor.newInstance(Constructor.java:423) + at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:306) + at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:433) + ... + Cause: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider + at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:722) + at org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:687) + at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:620) + at org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:673) + at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:414) + at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3462) + at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:171) + at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3522) + at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3496) + at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:591) +``` + +There's two main causes + +1. A class listed there is not an implementation of the interface. + Fix: review the settings and correct as appropriate. +1. A class listed there does implement the interface, but it has been loaded in a different + classloader, so the JVM does not consider it to be an implementation. + Fix: learn the entire JVM classloader model and see if you can then debug it. + Tip: having both the AWS Shaded SDK and individual AWS SDK modules on your classpath + may be a cause of this. + +If you see this and you are trying to use the S3A connector with Spark, then the cause can +be that the isolated classloader used to load Hive classes is interfering with the S3A +connector's dynamic loading of `com.amazonaws` classes. To fix this, declare that that +the classes in the aws SDK are loaded from the same classloader which instantiated +the S3A FileSystem instance: + +``` +spark.sql.hive.metastore.sharedPrefixes com.amazonaws. +``` + ## "The security token included in the request is invalid" You are trying to use session/temporary credentials and the session token @@ -1262,11 +1313,11 @@ Number of parts in multipart upload exceeded ``` org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y - at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432) - at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627) - at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532) - at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316) - at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301) ``` This is a known issue where upload fails if number of parts diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 1cf3fb4a3f6..9947ece0b8b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -359,11 +359,13 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { * and that it can be loaded. * The contents will be logged and returned. * @param dir directory to scan + * @param jobId job ID, only verified if non-empty * @return the loaded success data * @throws IOException IO Failure */ - protected SuccessData verifySuccessMarker(Path dir) throws IOException { - return validateSuccessFile(dir, "", getFileSystem(), "query", 0); + protected SuccessData verifySuccessMarker(Path dir, String jobId) + throws IOException { + return validateSuccessFile(dir, "", getFileSystem(), "query", 0, jobId); } /** @@ -442,6 +444,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { * @param fs filesystem * @param origin origin (e.g. "teragen" for messages) * @param minimumFileCount minimum number of files to have been created + * @param jobId job ID, only verified if non-empty * @return the success data * @throws IOException IO failure */ @@ -449,7 +452,8 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { final String committerName, final S3AFileSystem fs, final String origin, - final int minimumFileCount) throws IOException { + final int minimumFileCount, + final String jobId) throws IOException { SuccessData successData = loadSuccessFile(fs, outputPath, origin); String commitDetails = successData.toString(); LOG.info("Committer name " + committerName + "\n{}", @@ -463,8 +467,13 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { committerName, successData.getCommitter()); } Assertions.assertThat(successData.getFilenames()) - .describedAs("Files committed") + .describedAs("Files committed in " + commitDetails) .hasSizeGreaterThanOrEqualTo(minimumFileCount); + if (StringUtils.isNotEmpty(jobId)) { + Assertions.assertThat(successData.getJobId()) + .describedAs("JobID in " + commitDetails) + .isEqualTo(jobId); + } return successData; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 03759a5267e..89d505f20af 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -69,7 +71,12 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; import static org.apache.hadoop.test.LambdaTestUtils.*; /** @@ -377,6 +384,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { private final TaskAttemptContext tContext; private final AbstractS3ACommitter committer; private final Configuration conf; + private Path writtenTextPath; // null if not written to public JobData(Job job, JobContext jContext, @@ -467,7 +475,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { if (writeText) { // write output - writeTextOutput(tContext); + jobData.writtenTextPath = writeTextOutput(tContext); } return jobData; } @@ -659,12 +667,14 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { * file existence and contents, as well as optionally, the success marker. * @param dir directory to scan. * @param expectSuccessMarker check the success marker? + * @param expectedJobId job ID, verified if non-empty and success data loaded * @throws Exception failure. */ - private void validateContent(Path dir, boolean expectSuccessMarker) - throws Exception { + private void validateContent(Path dir, + boolean expectSuccessMarker, + String expectedJobId) throws Exception { if (expectSuccessMarker) { - verifySuccessMarker(dir); + SuccessData successData = verifySuccessMarker(dir, expectedJobId); } Path expectedFile = getPart0000(dir); log().debug("Validating content in {}", expectedFile); @@ -793,7 +803,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { // validate output describe("4. Validating content"); - validateContent(outDir, shouldExpectSuccessMarker()); + validateContent(outDir, shouldExpectSuccessMarker(), + committer.getUUID()); assertNoMultipartUploadsPending(outDir); } @@ -810,7 +821,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { commit(committer, jContext, tContext); // validate output - validateContent(outDir, shouldExpectSuccessMarker()); + validateContent(outDir, shouldExpectSuccessMarker(), + committer.getUUID()); assertNoMultipartUploadsPending(outDir); @@ -875,7 +887,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { // validate output S3AFileSystem fs = getFileSystem(); - SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1); + SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1, + ""); Assertions.assertThat(successData.getFilenames()) .describedAs("Files committed") .hasSize(1); @@ -911,7 +924,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { commitJob(committer, jContext); // but the data got there, due to the order of operations. - validateContent(outDir, shouldExpectSuccessMarker()); + validateContent(outDir, shouldExpectSuccessMarker(), + committer.getUUID()); expectJobCommitToFail(jContext, committer); } @@ -1007,7 +1021,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { describe("\nvalidating"); // validate output - verifySuccessMarker(outDir); + verifySuccessMarker(outDir, committer.getUUID()); describe("validate output of %s", outDir); validateMapFileOutputContent(fs, outDir); @@ -1269,7 +1283,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { // validate output // There's no success marker in the subdirectory - validateContent(outSubDir, false); + validateContent(outSubDir, false, ""); } /** @@ -1327,7 +1341,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { commitTask(committer, tContext); commitJob(committer, jContext); // validate output - verifySuccessMarker(outDir); + verifySuccessMarker(outDir, committer.getUUID()); } /** @@ -1387,7 +1401,9 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { assertNotEquals(job1Dest, job2Dest); // create the second job - Job job2 = newJob(job2Dest, new JobConf(getConfiguration()), attempt20); + Job job2 = newJob(job2Dest, + unsetUUIDOptions(new JobConf(getConfiguration())), + attempt20); Configuration conf2 = job2.getConfiguration(); conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); try { @@ -1400,7 +1416,13 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { setup(jobData2); abortInTeardown(jobData2); // make sure the directories are different - assertEquals(job2Dest, committer2.getOutputPath()); + assertNotEquals("Committer output paths", + committer1.getOutputPath(), + committer2.getOutputPath()); + + assertNotEquals("job UUIDs", + committer1.getUUID(), + committer2.getUUID()); // job2 setup, write some data there writeTextOutput(tContext2); @@ -1430,6 +1452,259 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { } + + /** + * Run two jobs with the same destination and different output paths. + *

+ * This only works if the jobs are set to NOT delete all outstanding + * uploads under the destination path. + *

+ * See HADOOP-17318. + */ + @Test + public void testParallelJobsToSameDestination() throws Throwable { + + describe("Run two jobs to the same destination, assert they both complete"); + Configuration conf = getConfiguration(); + conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false); + + // this job has a job ID generated and set as the spark UUID; + // the config is also set to require it. + // This mimics the Spark setup process. + + String stage1Id = UUID.randomUUID().toString(); + conf.set(SPARK_WRITE_UUID, stage1Id); + conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + + // create the job and write data in its task attempt + JobData jobData = startJob(true); + Job job1 = jobData.job; + AbstractS3ACommitter committer1 = jobData.committer; + JobContext jContext1 = jobData.jContext; + TaskAttemptContext tContext1 = jobData.tContext; + Path job1TaskOutputFile = jobData.writtenTextPath; + + // the write path + Assertions.assertThat(committer1.getWorkPath().toString()) + .describedAs("Work path path of %s", committer1) + .contains(stage1Id); + // now build up a second job + String jobId2 = randomJobId(); + + // second job will use same ID + String attempt2 = taskAttempt0.toString(); + TaskAttemptID taskAttempt2 = taskAttempt0; + + // create the second job + Configuration c2 = unsetUUIDOptions(new JobConf(conf)); + c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + Job job2 = newJob(outDir, + c2, + attempt2); + Configuration jobConf2 = job2.getConfiguration(); + jobConf2.set("mapreduce.output.basename", "task2"); + String stage2Id = UUID.randomUUID().toString(); + jobConf2.set(SPARK_WRITE_UUID, + stage2Id); + + JobContext jContext2 = new JobContextImpl(jobConf2, + taskAttempt2.getJobID()); + TaskAttemptContext tContext2 = + new TaskAttemptContextImpl(jobConf2, taskAttempt2); + AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2); + Assertions.assertThat(committer2.getJobAttemptPath(jContext2)) + .describedAs("Job attempt path of %s", committer2) + .isNotEqualTo(committer1.getJobAttemptPath(jContext1)); + Assertions.assertThat(committer2.getTaskAttemptPath(tContext2)) + .describedAs("Task attempt path of %s", committer2) + .isNotEqualTo(committer1.getTaskAttemptPath(tContext1)); + Assertions.assertThat(committer2.getWorkPath().toString()) + .describedAs("Work path path of %s", committer2) + .isNotEqualTo(committer1.getWorkPath().toString()) + .contains(stage2Id); + Assertions.assertThat(committer2.getUUIDSource()) + .describedAs("UUID source of %s", committer2) + .isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID); + JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2); + setup(jobData2); + abortInTeardown(jobData2); + + // the sequence is designed to ensure that job2 has active multipart + // uploads during/after job1's work + + // if the committer is a magic committer, MPUs start in the write, + // otherwise in task commit. + boolean multipartInitiatedInWrite = + committer2 instanceof MagicS3GuardCommitter; + + // job2. Here we start writing a file and have that write in progress + // when job 1 commits. + + LoggingTextOutputFormat.LoggingLineRecordWriter + recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter( + tContext2); + + LOG.info("Commit Task 1"); + commitTask(committer1, tContext1); + + if (multipartInitiatedInWrite) { + // magic committer runs -commit job1 while a job2 TA has an open + // writer (and hence: open MP Upload) + LOG.info("With Multipart Initiated In Write: Commit Job 1"); + commitJob(committer1, jContext1); + } + + // job2/task writes its output to the destination and + // closes the file + writeOutput(recordWriter2, tContext2); + + // get the output file + Path job2TaskOutputFile = recordWriter2.getDest(); + + + // commit the second task + LOG.info("Commit Task 2"); + commitTask(committer2, tContext2); + + if (!multipartInitiatedInWrite) { + // if not a magic committer, commit the job now. Because at + // this point the staging committer tasks from job2 will be pending + LOG.info("With Multipart NOT Initiated In Write: Commit Job 1"); + assertJobAttemptPathExists(committer1, jContext1); + commitJob(committer1, jContext1); + } + + // run the warning scan code, which will find output. + // this can be manually reviewed in the logs to verify + // readability + committer2.warnOnActiveUploads(outDir); + // and second job + LOG.info("Commit Job 2"); + assertJobAttemptPathExists(committer2, jContext2); + commitJob(committer2, jContext2); + + // validate the output + Path job1Output = new Path(outDir, job1TaskOutputFile.getName()); + Path job2Output = new Path(outDir, job2TaskOutputFile.getName()); + assertNotEquals("Job output file filenames must be different", + job1Output, job2Output); + + // job1 output must be there + assertPathExists("job 1 output", job1Output); + // job 2 file is there + assertPathExists("job 2 output", job2Output); + + // and nothing is pending + assertNoMultipartUploadsPending(outDir); + + } + + /** + * Verify self-generated UUID logic. + * A committer used for job setup can also use it for task setup, + * but a committer which generated a job ID but was only + * used for task setup -that is rejected. + * Task abort will still work. + */ + @Test + public void testSelfGeneratedUUID() throws Throwable { + describe("Run two jobs to the same destination, assert they both complete"); + Configuration conf = getConfiguration(); + + unsetUUIDOptions(conf); + // job is set to generate UUIDs + conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true); + + // create the job. don't write anything + JobData jobData = startJob(false); + AbstractS3ACommitter committer = jobData.committer; + String uuid = committer.getUUID(); + Assertions.assertThat(committer.getUUIDSource()) + .describedAs("UUID source of %s", committer) + .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally); + + // examine the job configuration and verify that it has been updated + Configuration jobConf = jobData.conf; + Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null)) + .describedAs("Config option " + FS_S3A_COMMITTER_UUID) + .isEqualTo(uuid); + Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null)) + .describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE) + .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally + .getText()); + + // because the task was set up in the job, it can have task + // setup called, even though it had a random ID. + committer.setupTask(jobData.tContext); + + // but a new committer will not be set up + TaskAttemptContext tContext2 = + new TaskAttemptContextImpl(conf, taskAttempt1); + AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2); + Assertions.assertThat(committer2.getUUIDSource()) + .describedAs("UUID source of %s", committer2) + .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally); + assertNotEquals("job UUIDs", + committer.getUUID(), + committer2.getUUID()); + // Task setup MUST fail. + intercept(PathCommitException.class, + E_SELF_GENERATED_JOB_UUID, () -> { + committer2.setupTask(tContext2); + return committer2; + }); + // task abort with the self-generated option is fine. + committer2.abortTask(tContext2); + } + + /** + * Verify the option to require a UUID applies and + * when a committer is instantiated without those options, + * it fails early. + */ + @Test + public void testRequirePropagatedUUID() throws Throwable { + Configuration conf = getConfiguration(); + + unsetUUIDOptions(conf); + conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true); + + // create the job, expect a failure, even if UUID generation + // is enabled. + intercept(PathCommitException.class, E_NO_SPARK_UUID, () -> + startJob(false)); + } + + /** + * Strip staging/spark UUID options. + * @param conf config + * @return the patched config + */ + protected Configuration unsetUUIDOptions(final Configuration conf) { + conf.unset(SPARK_WRITE_UUID); + conf.unset(FS_S3A_COMMITTER_UUID); + conf.unset(FS_S3A_COMMITTER_GENERATE_UUID); + conf.unset(FS_S3A_COMMITTER_REQUIRE_UUID); + return conf; + } + + /** + * Assert that a committer's job attempt path exists. + * For the staging committers, this is in the cluster FS. + * @param committer committer + * @param jobContext job context + * @throws IOException failure + */ + protected void assertJobAttemptPathExists( + final AbstractS3ACommitter committer, + final JobContext jobContext) throws IOException { + Path attemptPath = committer.getJobAttemptPath(jobContext); + ContractTestUtils.assertIsDirectory( + attemptPath.getFileSystem(committer.getConf()), + attemptPath); + } + @Test public void testS3ACommitterFactoryBinding() throws Throwable { describe("Verify that the committer factory returns this " diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java index 1ac80388952..5d1e919d917 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java @@ -66,7 +66,7 @@ public class LoggingTextOutputFormat extends TextOutputFormat { } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); - FSDataOutputStream fileOut = fs.create(file, false); + FSDataOutputStream fileOut = fs.create(file, true); LOG.debug("Creating LineRecordWriter with destination {}", file); if (isCompressed) { return new LoggingLineRecordWriter<>( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java index caf54d1c360..bf67f07d4fa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -77,7 +77,7 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -254,7 +254,7 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest { jobConf.set("mock-results-file", committerPath); // setting up staging options is harmless for other committers - jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); + jobConf.set(FS_S3A_COMMITTER_UUID, commitUUID); mrJob.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(mrJob, @@ -310,7 +310,8 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest { committerName(), fs, "MR job " + jobID, - 1); + 1, + ""); String commitData = successData.toString(); FileStatus[] results = fs.listStatus(outputPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index 057adf5341b..f6d6307b5d8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -21,6 +21,9 @@ package org.apache.hadoop.fs.s3a.commit.magic; import java.io.IOException; import java.net.URI; +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -34,6 +37,7 @@ import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.hamcrest.CoreMatchers.containsString; @@ -91,14 +95,14 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { } @Override - protected AbstractS3ACommitter createCommitter( + protected MagicS3GuardCommitter createCommitter( Path outputPath, TaskAttemptContext context) throws IOException { return new MagicS3GuardCommitter(outputPath, context); } - public AbstractS3ACommitter createFailingCommitter( + public MagicS3GuardCommitter createFailingCommitter( TaskAttemptContext tContext) throws IOException { return new CommitterWithFailedThenSucceed(getOutDir(), tContext); } @@ -136,6 +140,41 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { containsString('/' + CommitConstants.MAGIC + '/')); } + /** + * Verify that the __magic path for the application/tasks use the + * committer UUID to ensure uniqueness in the case of more than + * one job writing to the same destination path. + */ + @Test + public void testCommittersPathsHaveUUID() throws Throwable { + TaskAttemptContext tContext = new TaskAttemptContextImpl( + getConfiguration(), + getTaskAttempt0()); + MagicS3GuardCommitter committer = createCommitter(getOutDir(), tContext); + + String ta0 = getTaskAttempt0().toString(); + // magic path for the task attempt + Path taskAttemptPath = committer.getTaskAttemptPath(tContext); + Assertions.assertThat(taskAttemptPath.toString()) + .describedAs("task path of %s", committer) + .contains(committer.getUUID()) + .contains(MAGIC) + .doesNotContain(TEMP_DATA) + .endsWith(BASE) + .contains(ta0); + + // temp path for files which the TA will create with an absolute path + // and which need renaming into place. + Path tempTaskAttemptPath = committer.getTempTaskAttemptPath(tContext); + Assertions.assertThat(tempTaskAttemptPath.toString()) + .describedAs("Temp task path of %s", committer) + .contains(committer.getUUID()) + .contains(TEMP_DATA) + .doesNotContain(MAGIC) + .doesNotContain(BASE) + .contains(ta0); + } + /** * The class provides a overridden implementation of commitJobInternal which * causes the commit failed for the first time then succeed. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java index f368bf25c77..031089e11b2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java @@ -341,7 +341,7 @@ public class StagingTestBase { protected JobConf createJobConf() { JobConf conf = new JobConf(); - conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, + conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_UUID, UUID.randomUUID().toString()); conf.setBoolean( CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, @@ -401,7 +401,7 @@ public class StagingTestBase { // get the task's configuration copy so modifications take effect String tmp = System.getProperty( - StagingCommitterConstants.JAVA_IO_TMPDIR); + InternalCommitterConstants.JAVA_IO_TMPDIR); tempDir = new File(tmp); tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer"); tac.getConfiguration().set( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java index 15ea75476a9..f552fa9b445 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -54,6 +54,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AWSClientIOException; import org.apache.hadoop.fs.s3a.MockS3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapred.JobConf; @@ -84,8 +86,13 @@ import static org.apache.hadoop.test.LambdaTestUtils.*; public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { private static final JobID JOB_ID = new JobID("job", 1); + + public static final TaskID TASK_ID = new TaskID(JOB_ID, TaskType.REDUCE, 2); + private static final TaskAttemptID AID = new TaskAttemptID( - new TaskID(JOB_ID, TaskType.REDUCE, 2), 3); + TASK_ID, 1); + private static final TaskAttemptID AID2 = new TaskAttemptID( + TASK_ID, 2); private static final Logger LOG = LoggerFactory.getLogger(TestStagingCommitter.class); @@ -141,8 +148,8 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads); jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, uniqueFilenames); - jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, - UUID.randomUUID().toString()); + jobConf.set(FS_S3A_COMMITTER_UUID, + uuid()); jobConf.set(RETRY_INTERVAL, "100ms"); jobConf.setInt(RETRY_LIMIT, 1); @@ -190,36 +197,137 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { } } - @Test - public void testUUIDPropagation() throws Exception { - Configuration config = new Configuration(); - String jobUUID = addUUID(config); - assertEquals("Upload UUID", jobUUID, - StagingCommitter.getUploadUUID(config, JOB_ID)); + private Configuration newConfig() { + return new Configuration(false); } + @Test + public void testUUIDPropagation() throws Exception { + Configuration config = newConfig(); + String uuid = uuid(); + config.set(SPARK_WRITE_UUID, uuid); + config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + Pair t3 = AbstractS3ACommitter + .buildJobUUID(config, JOB_ID); + assertEquals("Job UUID", uuid, t3.getLeft()); + assertEquals("Job UUID source: " + t3, + AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID, + t3.getRight()); + } + + /** + * If the Spark UUID is required, then binding will fail + * if a UUID did not get passed in. + */ + @Test + public void testUUIDValidation() throws Exception { + Configuration config = newConfig(); + config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + intercept(PathCommitException.class, E_NO_SPARK_UUID, () -> + AbstractS3ACommitter.buildJobUUID(config, JOB_ID)); + } + + /** + * Validate ordering of UUID retrieval. + */ + @Test + public void testUUIDLoadOrdering() throws Exception { + Configuration config = newConfig(); + config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true); + String uuid = uuid(); + // MUST be picked up + config.set(FS_S3A_COMMITTER_UUID, uuid); + config.set(SPARK_WRITE_UUID, "something"); + Pair t3 = AbstractS3ACommitter + .buildJobUUID(config, JOB_ID); + assertEquals("Job UUID", uuid, t3.getLeft()); + assertEquals("Job UUID source: " + t3, + AbstractS3ACommitter.JobUUIDSource.CommitterUUIDProperty, + t3.getRight()); + } + + /** + * Verify that unless the config enables self-generation, JobIDs + * are used. + */ + @Test + public void testJobIDIsUUID() throws Exception { + Configuration config = newConfig(); + Pair t3 = AbstractS3ACommitter + .buildJobUUID(config, JOB_ID); + assertEquals("Job UUID source: " + t3, + AbstractS3ACommitter.JobUUIDSource.JobID, + t3.getRight()); + // parse it as a JobID + JobID.forName(t3.getLeft()); + } + + /** + * Verify self-generated UUIDs are supported when enabled, + * and come before JobID. + */ + @Test + public void testSelfGeneratedUUID() throws Exception { + Configuration config = newConfig(); + config.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true); + Pair t3 = AbstractS3ACommitter + .buildJobUUID(config, JOB_ID); + assertEquals("Job UUID source: " + t3, + AbstractS3ACommitter.JobUUIDSource.GeneratedLocally, + t3.getRight()); + // parse it + UUID.fromString(t3.getLeft()); + } + + /** + * Create a UUID and add it as the staging UUID. + * @param config config to patch + * @return the UUID + */ private String addUUID(Configuration config) { - String jobUUID = UUID.randomUUID().toString(); - config.set(FS_S3A_COMMITTER_STAGING_UUID, jobUUID); + String jobUUID = uuid(); + config.set(FS_S3A_COMMITTER_UUID, jobUUID); return jobUUID; } + /** + * Create a new UUID. + * @return a uuid as a string. + */ + private String uuid() { + return UUID.randomUUID().toString(); + } + @Test public void testAttemptPathConstructionNoSchema() throws Exception { - Configuration config = new Configuration(); + Configuration config = newConfig(); final String jobUUID = addUUID(config); config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1"); String commonPath = "file:/tmp/mr-local-"; + Assertions.assertThat(getLocalTaskAttemptTempDir(config, + jobUUID, tac.getTaskAttemptID()).toString()) + .describedAs("Missing scheme should produce local file paths") + .startsWith(commonPath) + .contains(jobUUID); + } - assertThat("Missing scheme should produce local file paths", - getLocalTaskAttemptTempDir(config, - jobUUID, tac.getTaskAttemptID()).toString(), - StringStartsWith.startsWith(commonPath)); + @Test + public void testAttemptPathsDifferentByTaskAttempt() throws Exception { + Configuration config = newConfig(); + final String jobUUID = addUUID(config); + config.set(BUFFER_DIR, "file:/tmp/mr-local-0"); + String attempt1Path = getLocalTaskAttemptTempDir(config, + jobUUID, AID).toString(); + String attempt2Path = getLocalTaskAttemptTempDir(config, + jobUUID, AID2).toString(); + Assertions.assertThat(attempt2Path) + .describedAs("local task attempt dir of TA1 must not match that of TA2") + .isNotEqualTo(attempt1Path); } @Test public void testAttemptPathConstructionWithSchema() throws Exception { - Configuration config = new Configuration(); + Configuration config = newConfig(); final String jobUUID = addUUID(config); String commonPath = "file:/tmp/mr-local-"; @@ -234,7 +342,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { @Test public void testAttemptPathConstructionWrongSchema() throws Exception { - Configuration config = new Configuration(); + Configuration config = newConfig(); final String jobUUID = addUUID(config); config.set(BUFFER_DIR, "hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1"); @@ -270,7 +378,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { assertEquals("Should name the commits file with the task ID: " + results, "task_job_0001_r_000002", stats[0].getPath().getName()); - PendingSet pending = PendingSet.load(dfs, stats[0].getPath()); + PendingSet pending = PendingSet.load(dfs, stats[0]); assertEquals("Should have one pending commit", 1, pending.size()); SinglePendingCommit commit = pending.getCommits().get(0); assertEquals("Should write to the correct bucket:" + results, @@ -310,8 +418,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { assertEquals("Should name the commits file with the task ID", "task_job_0001_r_000002", stats[0].getPath().getName()); - PendingSet pending = PendingSet.load(dfs, - stats[0].getPath()); + PendingSet pending = PendingSet.load(dfs, stats[0]); assertEquals("Should have one pending commit", 1, pending.size()); } @@ -334,7 +441,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { "task_job_0001_r_000002", stats[0].getPath().getName()); List pending = - PendingSet.load(dfs, stats[0].getPath()).getCommits(); + PendingSet.load(dfs, stats[0]).getCommits(); assertEquals("Should have correct number of pending commits", files.size(), pending.size()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java index 872097ff6f0..86b677c70a3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -110,7 +110,7 @@ public class TestStagingPartitionedJobCommit file.deleteOnExit(); Path path = new Path(file.toURI()); pendingSet.save(localFS, path, true); - activeCommit.add(path); + activeCommit.add(localFS.getFileStatus(path)); } return activeCommit; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java index 180e7435222..a4dfacead38 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -19,7 +19,9 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; import java.io.IOException; +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,6 +36,7 @@ import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.staging.Paths; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -68,8 +71,15 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol { // identify working dir for staging and delete Configuration conf = getConfiguration(); - String uuid = StagingCommitter.getUploadUUID(conf, - getTaskAttempt0().getJobID()); + String uuid = UUID.randomUUID().toString(); + conf.set(InternalCommitterConstants.SPARK_WRITE_UUID, + uuid); + Pair t3 = AbstractS3ACommitter + .buildJobUUID(conf, JobID.forName("job_" + getJobId())); + assertEquals("Job UUID", uuid, t3.getLeft()); + assertEquals("Job UUID source: " + t3, + AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID, + t3.getRight()); Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid, getTaskAttempt0()); rmdir(tempDir, conf); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index dc6c6d19db9..3a28fef8efb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -242,7 +242,7 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest { + "(" + StringUtils.join(", ", args) + ")" + " failed", 0, result); validateSuccessFile(dest, committerName(), getFileSystem(), stage, - minimumFileCount); + minimumFileCount, ""); completedStage(stage, d); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 315d1fe7285..8d29c29763f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -103,7 +103,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { KEY_HUGE_PARTITION_SIZE, DEFAULT_HUGE_PARTITION_SIZE); assertTrue("Partition size too small: " + partitionSize, - partitionSize > MULTIPART_MIN_SIZE); + partitionSize >= MULTIPART_MIN_SIZE); conf.setLong(SOCKET_SEND_BUFFER, _1MB); conf.setLong(SOCKET_RECV_BUFFER, _1MB); conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);