From 39b8bbe663abc10d2dd327f426c94c147deb36ab Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 11 Jan 2012 06:53:10 +0000 Subject: [PATCH] MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all task-updates. Contributed by Siddarth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229906 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TaskAttemptListenerImpl.java | 7 ++- .../v2/app/TaskHeartbeatHandler.java | 57 ++++++++++--------- .../mapred/TestTaskAttemptListenerImpl.java | 2 +- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index efffdc236b6..b01dacf3d16 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -195,6 +195,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM thereby reducing the AM heap size and preventing full GCs. (vinodkv) + MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all + task-updates. (Siddarth Seth via vinodkv) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index cefc7b7b9aa..0b4ea94e276 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -88,7 +88,7 @@ public class TaskAttemptListenerImpl extends CompositeService @Override public void init(Configuration conf) { - registerHeartbeatHandler(); + registerHeartbeatHandler(conf); super.init(conf); } @@ -98,9 +98,10 @@ public class TaskAttemptListenerImpl extends CompositeService super.start(); } - protected void registerHeartbeatHandler() { + protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = new TaskHeartbeatHandler(context.getEventHandler(), - context.getClock()); + context.getClock(), conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, + MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT)); addService(taskHeartbeatHandler); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index f3bdd1eab84..a9e8be6258a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -18,9 +18,10 @@ package org.apache.hadoop.mapreduce.v2.app; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,13 +57,15 @@ public class TaskHeartbeatHandler extends AbstractService { private final EventHandler eventHandler; private final Clock clock; - private Map runningAttempts - = new HashMap(); + private ConcurrentMap runningAttempts; - public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock) { + public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock, + int numThreads) { super("TaskHeartbeatHandler"); this.eventHandler = eventHandler; this.clock = clock; + runningAttempts = + new ConcurrentHashMap(16, 0.75f, numThreads); } @Override @@ -88,18 +91,16 @@ public class TaskHeartbeatHandler extends AbstractService { super.stop(); } - public synchronized void receivedPing(TaskAttemptId attemptID) { - //only put for the registered attempts - if (runningAttempts.containsKey(attemptID)) { - runningAttempts.put(attemptID, clock.getTime()); - } + public void receivedPing(TaskAttemptId attemptID) { + //only put for the registered attempts + runningAttempts.replace(attemptID, clock.getTime()); } - public synchronized void register(TaskAttemptId attemptID) { + public void register(TaskAttemptId attemptID) { runningAttempts.put(attemptID, clock.getTime()); } - public synchronized void unregister(TaskAttemptId attemptID) { + public void unregister(TaskAttemptId attemptID) { runningAttempts.remove(attemptID); } @@ -108,25 +109,30 @@ public class TaskHeartbeatHandler extends AbstractService { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { - synchronized (TaskHeartbeatHandler.this) { - Iterator> iterator = + Iterator> iterator = runningAttempts.entrySet().iterator(); - //avoid calculating current time everytime in loop - long currentTime = clock.getTime(); + // avoid calculating current time everytime in loop + long currentTime = clock.getTime(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (currentTime > entry.getValue() + taskTimeOut) { - //task is lost, remove from the list and raise lost event + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTime > entry.getValue() + taskTimeOut) { + + //In case the iterator isn't picking up the latest. + // Extra lookup outside of the iterator - but only if the task + // is considered to be timed out. + Long taskTime = runningAttempts.get(entry.getKey()); + if (taskTime != null && currentTime > taskTime + taskTimeOut) { + // task is lost, remove from the list and raise lost event iterator.remove(); - eventHandler.handle( - new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(), - "AttemptID:" + entry.getKey().toString() + - " Timed out after " + taskTimeOut/1000 + " secs")); - eventHandler.handle(new TaskAttemptEvent(entry - .getKey(), TaskAttemptEventType.TA_TIMED_OUT)); + eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry + .getKey(), "AttemptID:" + entry.getKey().toString() + + " Timed out after " + taskTimeOut / 1000 + " secs")); + eventHandler.handle(new TaskAttemptEvent(entry.getKey(), + TaskAttemptEventType.TA_TIMED_OUT)); } + } } try { @@ -137,7 +143,6 @@ public class TaskHeartbeatHandler extends AbstractService { } } } - } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index f26091ac648..47a0e55f586 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -43,7 +43,7 @@ public class TestTaskAttemptListenerImpl { } @Override - protected void registerHeartbeatHandler() { + protected void registerHeartbeatHandler(Configuration conf) { //Empty }