HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)
Contributed by Dongjoon Hyun and Steve Loughran Change-Id: Ibaf8082e60eff5298ff4e6513edc386c5bae0274
This commit is contained in:
parent
69ef9b1ee8
commit
b92f72758b
@ -261,4 +261,7 @@ private CommitConstants() {
|
|||||||
*/
|
*/
|
||||||
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
|
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
|
||||||
|
|
||||||
|
/** Extra Data key for task attempt in pendingset files. */
|
||||||
|
public static final String TASK_ATTEMPT_ID = "task.attempt.id";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -189,4 +189,13 @@ public List<SinglePendingCommit> getCommits() {
|
|||||||
public void setCommits(List<SinglePendingCommit> commits) {
|
public void setCommits(List<SinglePendingCommit> commits) {
|
||||||
this.commits = commits;
|
this.commits = commits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set/Update an extra data entry.
|
||||||
|
* @param key key
|
||||||
|
* @param value value
|
||||||
|
*/
|
||||||
|
public void putExtraData(String key, String value) {
|
||||||
|
extraData.put(key, value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -418,6 +418,15 @@ public void setExtraData(Map<String, String> extraData) {
|
|||||||
this.extraData = extraData;
|
this.extraData = extraData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set/Update an extra data entry.
|
||||||
|
* @param key key
|
||||||
|
* @param value value
|
||||||
|
*/
|
||||||
|
public void putExtraData(String key, String value) {
|
||||||
|
extraData.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destination file size.
|
* Destination file size.
|
||||||
* @return size of destination object
|
* @return size of destination object
|
||||||
@ -429,4 +438,5 @@ public long getLength() {
|
|||||||
public void setLength(long length) {
|
public void setLength(long length) {
|
||||||
this.length = length;
|
this.length = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.util.DurationInfo;
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
|
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
|
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
|
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
|
||||||
@ -213,7 +214,7 @@ private PendingSet innerCommitTask(
|
|||||||
commit.setJobId(jobId);
|
commit.setJobId(jobId);
|
||||||
commit.setTaskId(taskId);
|
commit.setTaskId(taskId);
|
||||||
}
|
}
|
||||||
|
pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
|
||||||
Path jobAttemptPath = getJobAttemptPath(context);
|
Path jobAttemptPath = getJobAttemptPath(context);
|
||||||
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
|
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
|
||||||
Path taskOutcomePath = new Path(jobAttemptPath,
|
Path taskOutcomePath = new Path(jobAttemptPath,
|
||||||
@ -221,7 +222,8 @@ private PendingSet innerCommitTask(
|
|||||||
CommitConstants.PENDINGSET_SUFFIX);
|
CommitConstants.PENDINGSET_SUFFIX);
|
||||||
LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
|
LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
|
||||||
try {
|
try {
|
||||||
pendingSet.save(getDestFS(), taskOutcomePath, false);
|
// We will overwrite if there exists a pendingSet file already
|
||||||
|
pendingSet.save(getDestFS(), taskOutcomePath, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to save task commit data to {} ",
|
LOG.warn("Failed to save task commit data to {} ",
|
||||||
taskOutcomePath, e);
|
taskOutcomePath, e);
|
||||||
|
@ -695,6 +695,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
|
|||||||
context.progress();
|
context.progress();
|
||||||
|
|
||||||
PendingSet pendingCommits = new PendingSet(commitCount);
|
PendingSet pendingCommits = new PendingSet(commitCount);
|
||||||
|
pendingCommits.putExtraData(TASK_ATTEMPT_ID,
|
||||||
|
context.getTaskAttemptID().toString());
|
||||||
try {
|
try {
|
||||||
Tasks.foreach(taskOutput)
|
Tasks.foreach(taskOutput)
|
||||||
.stopOnFailure()
|
.stopOnFailure()
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.MapFile;
|
import org.apache.hadoop.io.MapFile;
|
||||||
@ -307,14 +308,19 @@ public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
|
|||||||
* @param context task
|
* @param context task
|
||||||
* @throws IOException IO failure
|
* @throws IOException IO failure
|
||||||
* @throws InterruptedException write interrupted
|
* @throws InterruptedException write interrupted
|
||||||
|
* @return the path written to
|
||||||
*/
|
*/
|
||||||
protected void writeTextOutput(TaskAttemptContext context)
|
protected Path writeTextOutput(TaskAttemptContext context)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
describe("write output");
|
describe("write output");
|
||||||
try (DurationInfo d = new DurationInfo(LOG,
|
try (DurationInfo d = new DurationInfo(LOG,
|
||||||
"Writing Text output for task %s", context.getTaskAttemptID())) {
|
"Writing Text output for task %s", context.getTaskAttemptID())) {
|
||||||
writeOutput(new LoggingTextOutputFormat().getRecordWriter(context),
|
LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
|
||||||
|
recordWriter = new LoggingTextOutputFormat<>().getRecordWriter(
|
||||||
context);
|
context);
|
||||||
|
writeOutput(recordWriter,
|
||||||
|
context);
|
||||||
|
return recordWriter.getDest();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -480,11 +486,17 @@ protected void setup(JobData jobData) throws IOException {
|
|||||||
"setup job %s", jContext.getJobID())) {
|
"setup job %s", jContext.getJobID())) {
|
||||||
committer.setupJob(jContext);
|
committer.setupJob(jContext);
|
||||||
}
|
}
|
||||||
|
setupCommitter(committer, tContext);
|
||||||
|
describe("setup complete\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupCommitter(
|
||||||
|
final AbstractS3ACommitter committer,
|
||||||
|
final TaskAttemptContext tContext) throws IOException {
|
||||||
try (DurationInfo d = new DurationInfo(LOG,
|
try (DurationInfo d = new DurationInfo(LOG,
|
||||||
"setup task %s", tContext.getTaskAttemptID())) {
|
"setup task %s", tContext.getTaskAttemptID())) {
|
||||||
committer.setupTask(tContext);
|
committer.setupTask(tContext);
|
||||||
}
|
}
|
||||||
describe("setup complete\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -806,6 +818,74 @@ public void testCommitterWithDuplicatedCommit() throws Exception {
|
|||||||
expectFNFEonTaskCommit(committer, tContext);
|
expectFNFEonTaskCommit(committer, tContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HADOOP-17258. If a second task attempt is committed, it
|
||||||
|
* must succeed, and the output of the first TA, even if already
|
||||||
|
* committed, MUST NOT be visible in the final output.
|
||||||
|
* <p></p>
|
||||||
|
* What's important is not just that only one TA must succeed,
|
||||||
|
* but it must be the last one executed. Why? because that's
|
||||||
|
* the one
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTwoTaskAttemptsCommit() throws Exception {
|
||||||
|
describe("Commit two task attempts;" +
|
||||||
|
" expect the second attempt to succeed.");
|
||||||
|
JobData jobData = startJob(false);
|
||||||
|
JobContext jContext = jobData.jContext;
|
||||||
|
TaskAttemptContext tContext = jobData.tContext;
|
||||||
|
AbstractS3ACommitter committer = jobData.committer;
|
||||||
|
// do commit
|
||||||
|
describe("\ncommitting task");
|
||||||
|
// write output for TA 1,
|
||||||
|
Path outputTA1 = writeTextOutput(tContext);
|
||||||
|
|
||||||
|
// speculatively execute committer 2.
|
||||||
|
|
||||||
|
// jobconf with a different base to its parts.
|
||||||
|
Configuration conf2 = jobData.conf;
|
||||||
|
conf2.set("mapreduce.output.basename", "attempt2");
|
||||||
|
String attempt2 = "attempt_" + jobId + "_m_000000_1";
|
||||||
|
TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
|
||||||
|
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
|
||||||
|
conf2, ta2);
|
||||||
|
|
||||||
|
AbstractS3ACommitter committer2 = standardCommitterFactory
|
||||||
|
.createCommitter(tContext2);
|
||||||
|
setupCommitter(committer2, tContext2);
|
||||||
|
// write output for TA 2,
|
||||||
|
Path outputTA2 = writeTextOutput(tContext2);
|
||||||
|
|
||||||
|
// verify the names are different.
|
||||||
|
String name1 = outputTA1.getName();
|
||||||
|
String name2 = outputTA2.getName();
|
||||||
|
Assertions.assertThat(name1)
|
||||||
|
.describedAs("name of task attempt output %s", outputTA1)
|
||||||
|
.isNotEqualTo(name2);
|
||||||
|
|
||||||
|
// commit task 1
|
||||||
|
committer.commitTask(tContext);
|
||||||
|
|
||||||
|
// then pretend that task1 didn't respond, so
|
||||||
|
// commit task 2
|
||||||
|
committer2.commitTask(tContext2);
|
||||||
|
|
||||||
|
// and the job
|
||||||
|
committer2.commitJob(tContext);
|
||||||
|
|
||||||
|
// validate output
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1);
|
||||||
|
Assertions.assertThat(successData.getFilenames())
|
||||||
|
.describedAs("Files committed")
|
||||||
|
.hasSize(1);
|
||||||
|
|
||||||
|
assertPathExists("attempt2 output", new Path(outDir, name2));
|
||||||
|
assertPathDoesNotExist("attempt1 output", new Path(outDir, name1));
|
||||||
|
|
||||||
|
assertNoMultipartUploadsPending(outDir);
|
||||||
|
}
|
||||||
|
|
||||||
protected boolean shouldExpectSuccessMarker() {
|
protected boolean shouldExpectSuccessMarker() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user