From b92f72758bf9b93bc20445a7737346ccb02208e8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 12 Oct 2020 13:39:15 +0100 Subject: [PATCH] HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371) Contributed by Dongjoon Hyun and Steve Loughran Change-Id: Ibaf8082e60eff5298ff4e6513edc386c5bae0274 --- .../hadoop/fs/s3a/commit/CommitConstants.java | 3 + .../fs/s3a/commit/files/PendingSet.java | 9 ++ .../s3a/commit/files/SinglePendingCommit.java | 10 +++ .../commit/magic/MagicS3GuardCommitter.java | 6 +- .../s3a/commit/staging/StagingCommitter.java | 2 + .../s3a/commit/AbstractITCommitProtocol.java | 86 ++++++++++++++++++- 6 files changed, 111 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 3e28a5d2cf9..e7c049226d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -261,4 +261,7 @@ public final class CommitConstants { */ public static final int SUCCESS_MARKER_FILE_LIMIT = 100; + /** Extra Data key for task attempt in pendingset files. */ + public static final String TASK_ATTEMPT_ID = "task.attempt.id"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java index c0d7415fcb9..4793b78e63f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java @@ -189,4 +189,13 @@ public class PendingSet extends PersistentCommitData { public void setCommits(List commits) { this.commits = commits; } + + /** + * Set/Update an extra data entry. + * @param key key + * @param value value + */ + public void putExtraData(String key, String value) { + extraData.put(key, value); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java index 596dd95685c..c848f80b02d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java @@ -418,6 +418,15 @@ public class SinglePendingCommit extends PersistentCommitData this.extraData = extraData; } + /** + * Set/Update an extra data entry. + * @param key key + * @param value value + */ + public void putExtraData(String key, String value) { + extraData.put(key, value); + } + /** * Destination file size. * @return size of destination object @@ -429,4 +438,5 @@ public class SinglePendingCommit extends PersistentCommitData public void setLength(long length) { this.length = length; } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 99121730644..30417ead353 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.DurationInfo; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; @@ -213,7 +214,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { commit.setJobId(jobId); commit.setTaskId(taskId); } - + pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); Path jobAttemptPath = getJobAttemptPath(context); TaskAttemptID taskAttemptID = context.getTaskAttemptID(); Path taskOutcomePath = new Path(jobAttemptPath, @@ -221,7 +222,8 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter { CommitConstants.PENDINGSET_SUFFIX); LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath); try { - pendingSet.save(getDestFS(), taskOutcomePath, false); + // We will overwrite if there exists a pendingSet file already + pendingSet.save(getDestFS(), taskOutcomePath, true); } catch (IOException e) { LOG.warn("Failed to save task commit data to {} ", taskOutcomePath, e); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 91e68af8bb1..9cc932b1ea8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -695,6 +695,8 @@ public class StagingCommitter extends AbstractS3ACommitter { context.progress(); PendingSet pendingCommits = new PendingSet(commitCount); + pendingCommits.putExtraData(TASK_ATTEMPT_ID, + context.getTaskAttemptID().toString()); try { Tasks.foreach(taskOutput) .stopOnFailure() diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index cacd54d12e9..03759a5267e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -307,14 +308,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { * @param context task * @throws IOException IO failure * @throws InterruptedException write interrupted + * @return the path written to */ - protected void writeTextOutput(TaskAttemptContext context) + protected Path writeTextOutput(TaskAttemptContext context) throws IOException, InterruptedException { describe("write output"); try (DurationInfo d = new DurationInfo(LOG, "Writing Text output for task %s", context.getTaskAttemptID())) { - writeOutput(new LoggingTextOutputFormat().getRecordWriter(context), + LoggingTextOutputFormat.LoggingLineRecordWriter + recordWriter = new LoggingTextOutputFormat<>().getRecordWriter( context); + writeOutput(recordWriter, + context); + return recordWriter.getDest(); } } @@ -480,11 +486,17 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { "setup job %s", jContext.getJobID())) { committer.setupJob(jContext); } + setupCommitter(committer, tContext); + describe("setup complete\n"); + } + + private void setupCommitter( + final AbstractS3ACommitter committer, + final TaskAttemptContext tContext) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "setup task %s", tContext.getTaskAttemptID())) { committer.setupTask(tContext); } - describe("setup complete\n"); } /** @@ -806,6 +818,74 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { expectFNFEonTaskCommit(committer, tContext); } + /** + * HADOOP-17258. If a second task attempt is committed, it + * must succeed, and the output of the first TA, even if already + * committed, MUST NOT be visible in the final output. + *

+ * What's important is not just that only one TA must succeed, + * but it must be the last one executed. Why? because that's + * the one + */ + @Test + public void testTwoTaskAttemptsCommit() throws Exception { + describe("Commit two task attempts;" + + " expect the second attempt to succeed."); + JobData jobData = startJob(false); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + // do commit + describe("\ncommitting task"); + // write output for TA 1, + Path outputTA1 = writeTextOutput(tContext); + + // speculatively execute committer 2. + + // jobconf with a different base to its parts. + Configuration conf2 = jobData.conf; + conf2.set("mapreduce.output.basename", "attempt2"); + String attempt2 = "attempt_" + jobId + "_m_000000_1"; + TaskAttemptID ta2 = TaskAttemptID.forName(attempt2); + TaskAttemptContext tContext2 = new TaskAttemptContextImpl( + conf2, ta2); + + AbstractS3ACommitter committer2 = standardCommitterFactory + .createCommitter(tContext2); + setupCommitter(committer2, tContext2); + // write output for TA 2, + Path outputTA2 = writeTextOutput(tContext2); + + // verify the names are different. + String name1 = outputTA1.getName(); + String name2 = outputTA2.getName(); + Assertions.assertThat(name1) + .describedAs("name of task attempt output %s", outputTA1) + .isNotEqualTo(name2); + + // commit task 1 + committer.commitTask(tContext); + + // then pretend that task1 didn't respond, so + // commit task 2 + committer2.commitTask(tContext2); + + // and the job + committer2.commitJob(tContext); + + // validate output + S3AFileSystem fs = getFileSystem(); + SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1); + Assertions.assertThat(successData.getFilenames()) + .describedAs("Files committed") + .hasSize(1); + + assertPathExists("attempt2 output", new Path(outDir, name2)); + assertPathDoesNotExist("attempt1 output", new Path(outDir, name1)); + + assertNoMultipartUploadsPending(outDir); + } + protected boolean shouldExpectSuccessMarker() { return true; }