From eee59a83723febf18daab6916a1233858a96380d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 15 Aug 2022 11:19:56 +0100 Subject: [PATCH] Revert "HADOOP-18402. S3A committer NPE in spark job abort (#4735)" (managed to commit through the github ui before I'd got the message done) This reverts commit ad83e95046b92540055f7caecf652c455ed2daf9. --- .../fs/s3a/commit/impl/AuditContextUpdater.java | 16 +++------------- .../hadoop/fs/s3a/commit/impl/CommitContext.java | 8 +------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java index 17f63e6dff7..20024ba601d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.audit.AuditConstants; import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -50,17 +49,12 @@ public final class AuditContextUpdater { * @param jobContext job/task context. */ public AuditContextUpdater(final JobContext jobContext) { - JobID contextJobID = jobContext.getJobID(); - this.jobId = contextJobID != null - ? contextJobID.toString() - : null; + this.jobId = jobContext.getJobID().toString(); if (jobContext instanceof TaskAttemptContext) { // it's a task, extract info for auditing final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID(); - this.taskAttemptId = tid != null - ? tid.toString() - : null; + this.taskAttemptId = tid.toString(); } else { this.taskAttemptId = null; } @@ -76,11 +70,7 @@ public final class AuditContextUpdater { */ public void updateCurrentAuditContext() { final CommonAuditContext auditCtx = currentAuditContext(); - if (jobId != null) { - auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); - } else { - currentAuditContext().remove(AuditConstants.PARAM_JOB_ID); - } + auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); if (taskAttemptId != null) { auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId); } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index c93d2d8f739..8ac3dcb231d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.JsonSerialization; import org.apache.hadoop.util.Preconditions; @@ -157,12 +156,7 @@ public final class CommitContext implements Closeable { this.commitOperations = commitOperations; this.jobContext = jobContext; this.conf = jobContext.getConfiguration(); - JobID contextJobID = jobContext.getJobID(); - // either the job ID or make one up as it will be - // used for the filename of any reports. - this.jobId = contextJobID != null - ? contextJobID.toString() - : ("job-without-id-at-" + System.currentTimeMillis()); + this.jobId = jobContext.getJobID().toString(); this.collectIOStatistics = conf.getBoolean( S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);