From daaa73b657fb28b2d437bfb03821faaa0d458f4e Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Thu, 25 Apr 2013 00:54:04 +0000 Subject: [PATCH] MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient synchronization on updates to task Counters. Contributed by Sandy Ryza. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471796 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../apache/hadoop/mapred/LocalJobRunner.java | 23 ++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index efa52a90c22..1f1d45c79ea 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -347,6 +347,9 @@ Release 2.0.5-beta - UNRELEASED can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. (acmurthy) + MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient + synchronization on updates to task Counters. (Sandy Ryza via acmurthy) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 85f8c183fa3..d71aff518d0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -18,6 +18,10 @@ package org.apache.hadoop.mapred; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -86,8 +90,6 @@ public class LocalJobRunner implements ClientProtocol { private static final String jobDir = "localRunner/"; - private static final Counters EMPTY_COUNTERS = new Counters(); - public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } @@ -273,10 +275,10 @@ public class LocalJobRunner implements ClientProtocol { this.partialMapProgress = new float[numMaps]; this.mapCounters = new Counters[numMaps]; for (int i = 0; i < numMaps; i++) { - this.mapCounters[i] = EMPTY_COUNTERS; + this.mapCounters[i] = new Counters(); } - this.reduceCounters = EMPTY_COUNTERS; + this.reduceCounters = new Counters(); } /** @@ -497,6 +499,15 @@ public class LocalJobRunner implements ClientProtocol { public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException { + // Serialize as we would if distributed in order to make deep copy + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + taskStatus.write(dos); + dos.close(); + taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap()); + taskStatus.readFields(new DataInputStream( + new ByteArrayInputStream(baos.toByteArray()))); + LOG.info(taskStatus.getStateString()); int taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -525,10 +536,10 @@ public class LocalJobRunner implements ClientProtocol { public synchronized Counters getCurrentCounters() { if (null == mapCounters) { // Counters not yet initialized for job. - return EMPTY_COUNTERS; + return new Counters(); } - Counters current = EMPTY_COUNTERS; + Counters current = new Counters(); for (Counters c : mapCounters) { current = Counters.sum(current, c); }