MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all task-updates. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-11 06:53:10 +00:00
parent 1c8e3e8854
commit 39b8bbe663
4 changed files with 39 additions and 30 deletions

View File

@ -195,6 +195,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM
thereby reducing the AM heap size and preventing full GCs. (vinodkv)
MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all
task-updates. (Siddarth Seth via vinodkv)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -88,7 +88,7 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override
public void init(Configuration conf) {
registerHeartbeatHandler();
registerHeartbeatHandler(conf);
super.init(conf);
}
@ -98,9 +98,10 @@ public class TaskAttemptListenerImpl extends CompositeService
super.start();
}
protected void registerHeartbeatHandler() {
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = new TaskHeartbeatHandler(context.getEventHandler(),
context.getClock());
context.getClock(), conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
addService(taskHeartbeatHandler);
}

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -56,13 +57,15 @@ public class TaskHeartbeatHandler extends AbstractService {
private final EventHandler eventHandler;
private final Clock clock;
private Map<TaskAttemptId, Long> runningAttempts
= new HashMap<TaskAttemptId, Long>();
private ConcurrentMap<TaskAttemptId, Long> runningAttempts;
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock) {
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
int numThreads) {
super("TaskHeartbeatHandler");
this.eventHandler = eventHandler;
this.clock = clock;
runningAttempts =
new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads);
}
@Override
@ -88,18 +91,16 @@ public class TaskHeartbeatHandler extends AbstractService {
super.stop();
}
public synchronized void receivedPing(TaskAttemptId attemptID) {
//only put for the registered attempts
if (runningAttempts.containsKey(attemptID)) {
runningAttempts.put(attemptID, clock.getTime());
}
public void receivedPing(TaskAttemptId attemptID) {
//only put for the registered attempts
runningAttempts.replace(attemptID, clock.getTime());
}
public synchronized void register(TaskAttemptId attemptID) {
public void register(TaskAttemptId attemptID) {
runningAttempts.put(attemptID, clock.getTime());
}
public synchronized void unregister(TaskAttemptId attemptID) {
public void unregister(TaskAttemptId attemptID) {
runningAttempts.remove(attemptID);
}
@ -108,25 +109,30 @@ public class TaskHeartbeatHandler extends AbstractService {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (TaskHeartbeatHandler.this) {
Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
runningAttempts.entrySet().iterator();
//avoid calculating current time everytime in loop
long currentTime = clock.getTime();
// avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
Map.Entry<TaskAttemptId, Long> entry = iterator.next();
if (currentTime > entry.getValue() + taskTimeOut) {
//task is lost, remove from the list and raise lost event
while (iterator.hasNext()) {
Map.Entry<TaskAttemptId, Long> entry = iterator.next();
if (currentTime > entry.getValue() + taskTimeOut) {
//In case the iterator isn't picking up the latest.
// Extra lookup outside of the iterator - but only if the task
// is considered to be timed out.
Long taskTime = runningAttempts.get(entry.getKey());
if (taskTime != null && currentTime > taskTime + taskTimeOut) {
// task is lost, remove from the list and raise lost event
iterator.remove();
eventHandler.handle(
new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(),
"AttemptID:" + entry.getKey().toString() +
" Timed out after " + taskTimeOut/1000 + " secs"));
eventHandler.handle(new TaskAttemptEvent(entry
.getKey(), TaskAttemptEventType.TA_TIMED_OUT));
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
.getKey(), "AttemptID:" + entry.getKey().toString()
+ " Timed out after " + taskTimeOut / 1000 + " secs"));
eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
TaskAttemptEventType.TA_TIMED_OUT));
}
}
}
try {
@ -137,7 +143,6 @@ public class TaskHeartbeatHandler extends AbstractService {
}
}
}
}
}

View File

@ -43,7 +43,7 @@ public class TestTaskAttemptListenerImpl {
}
@Override
protected void registerHeartbeatHandler() {
protected void registerHeartbeatHandler(Configuration conf) {
//Empty
}