MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas
This commit is contained in:
parent
c9bf813c9a
commit
fe35103591
|
@ -585,33 +585,38 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
|
||||
TaskAttemptStatus taskAttemptStatus,
|
||||
AtomicReference<TaskAttemptStatus> lastStatusRef) {
|
||||
boolean asyncUpdatedNeeded = false;
|
||||
TaskAttemptStatus lastStatus = lastStatusRef.get();
|
||||
|
||||
if (lastStatus == null) {
|
||||
lastStatusRef.set(taskAttemptStatus);
|
||||
asyncUpdatedNeeded = true;
|
||||
} else {
|
||||
List<TaskAttemptId> oldFetchFailedMaps =
|
||||
taskAttemptStatus.fetchFailedMaps;
|
||||
|
||||
// merge fetchFailedMaps from the previous update
|
||||
if (lastStatus.fetchFailedMaps != null) {
|
||||
List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
|
||||
TaskAttemptStatus lastStatus = null;
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
lastStatus = lastStatusRef.get();
|
||||
if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
|
||||
// merge fetchFailedMaps from the previous update
|
||||
if (taskAttemptStatus.fetchFailedMaps == null) {
|
||||
taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
|
||||
} else {
|
||||
taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
|
||||
taskAttemptStatus.fetchFailedMaps =
|
||||
new ArrayList<>(lastStatus.fetchFailedMaps.size() +
|
||||
fetchFailedMaps.size());
|
||||
taskAttemptStatus.fetchFailedMaps.addAll(
|
||||
lastStatus.fetchFailedMaps);
|
||||
taskAttemptStatus.fetchFailedMaps.addAll(
|
||||
fetchFailedMaps);
|
||||
}
|
||||
}
|
||||
|
||||
if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
|
||||
// update failed - async dispatcher has processed it in the meantime
|
||||
taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
|
||||
lastStatusRef.set(taskAttemptStatus);
|
||||
asyncUpdatedNeeded = true;
|
||||
// lastStatusRef may be changed by either the AsyncDispatcher when
|
||||
// it processes the update, or by another IPC server handler
|
||||
done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
|
||||
if (!done) {
|
||||
LOG.info("TaskAttempt " + yarnAttemptID +
|
||||
": lastStatusRef changed by another thread, retrying...");
|
||||
// let's revert taskAttemptStatus.fetchFailedMaps
|
||||
taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
|
||||
}
|
||||
}
|
||||
|
||||
boolean asyncUpdatedNeeded = (lastStatus == null);
|
||||
if (asyncUpdatedNeeded) {
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
|
||||
|
|
Loading…
Reference in New Issue