Merge -c 1471796 from trunk to branch-2 to fix MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient synchronization on updates to task Counters. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-04-25 00:55:00 +00:00
parent 2838617e0a
commit c281269d0e
2 changed files with 20 additions and 6 deletions

View File

@ -188,6 +188,9 @@ Release 2.0.5-beta - UNRELEASED
can override Mapper.run and Reducer.run to get the old (inconsistent) can override Mapper.run and Reducer.run to get the old (inconsistent)
behaviour. (acmurthy) behaviour. (acmurthy)
MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -86,8 +90,6 @@ public class LocalJobRunner implements ClientProtocol {
private static final String jobDir = "localRunner/"; private static final String jobDir = "localRunner/";
private static final Counters EMPTY_COUNTERS = new Counters();
public long getProtocolVersion(String protocol, long clientVersion) { public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID; return ClientProtocol.versionID;
} }
@ -273,10 +275,10 @@ private synchronized void initCounters(int numMaps) {
this.partialMapProgress = new float[numMaps]; this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps]; this.mapCounters = new Counters[numMaps];
for (int i = 0; i < numMaps; i++) { for (int i = 0; i < numMaps; i++) {
this.mapCounters[i] = EMPTY_COUNTERS; this.mapCounters[i] = new Counters();
} }
this.reduceCounters = EMPTY_COUNTERS; this.reduceCounters = new Counters();
} }
/** /**
@ -497,6 +499,15 @@ public void run() {
public synchronized boolean statusUpdate(TaskAttemptID taskId, public synchronized boolean statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException { TaskStatus taskStatus) throws IOException, InterruptedException {
// Serialize as we would if distributed in order to make deep copy
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
taskStatus.write(dos);
dos.close();
taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
taskStatus.readFields(new DataInputStream(
new ByteArrayInputStream(baos.toByteArray())));
LOG.info(taskStatus.getStateString()); LOG.info(taskStatus.getStateString());
int taskIndex = mapIds.indexOf(taskId); int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping if (taskIndex >= 0) { // mapping
@ -525,10 +536,10 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId,
public synchronized Counters getCurrentCounters() { public synchronized Counters getCurrentCounters() {
if (null == mapCounters) { if (null == mapCounters) {
// Counters not yet initialized for job. // Counters not yet initialized for job.
return EMPTY_COUNTERS; return new Counters();
} }
Counters current = EMPTY_COUNTERS; Counters current = new Counters();
for (Counters c : mapCounters) { for (Counters c : mapCounters) {
current = Counters.sum(current, c); current = Counters.sum(current, c);
} }