HADOOP-18402. S3A committer NPE in spark job abort (#4735)

JobID.toString() and TaskID.toString() to only be called
when the IDs are not null.

This doesn't surface in MapReduce, but Spark SQL can trigger
in job abort, where it may invoke abortJob() with an
incomplete TaskContext.

This patch MUST be applied to branches containing
HADOOP-17833. "Improve Magic Committer Performance."

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2022-08-15 11:18:47 +01:00
parent 6b7c1329b2
commit 97763619c9
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 20 additions and 4 deletions

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.fs.audit.AuditConstants;
import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@ -49,12 +50,17 @@ public final class AuditContextUpdater {
* @param jobContext job/task context.
*/
public AuditContextUpdater(final JobContext jobContext) {
this.jobId = jobContext.getJobID().toString();
JobID contextJobID = jobContext.getJobID();
this.jobId = contextJobID != null
? contextJobID.toString()
: null;
if (jobContext instanceof TaskAttemptContext) {
// it's a task, extract info for auditing
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
this.taskAttemptId = tid.toString();
this.taskAttemptId = tid != null
? tid.toString()
: null;
} else {
this.taskAttemptId = null;
}
@ -70,7 +76,11 @@ public final class AuditContextUpdater {
*/
public void updateCurrentAuditContext() {
final CommonAuditContext auditCtx = currentAuditContext();
auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
if (jobId != null) {
auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
} else {
currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
}
if (taskAttemptId != null) {
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
} else {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.Preconditions;
@ -156,7 +157,12 @@ public final class CommitContext implements Closeable {
this.commitOperations = commitOperations;
this.jobContext = jobContext;
this.conf = jobContext.getConfiguration();
this.jobId = jobContext.getJobID().toString();
JobID contextJobID = jobContext.getJobID();
// either the job ID or make one up as it will be
// used for the filename of any reports.
this.jobId = contextJobID != null
? contextJobID.toString()
: ("job-without-id-at-" + System.currentTimeMillis());
this.collectIOStatistics = conf.getBoolean(
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);