From 03d46dc571bc5b0f1b3c0cb5daa52e7ee324dd54 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Thu, 5 Jan 2012 05:19:44 +0000 Subject: [PATCH] MAPREDUCE-3569. TaskAttemptListener holds a global lock for all task-updates. (Contributed by Vinod Kumar Vavilapalli) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227485 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TaskAttemptListenerImpl.java | 98 +++++++++---------- .../mapreduce/v2/app/TaskAttemptListener.java | 17 ++-- .../v2/app/job/impl/TaskAttemptImpl.java | 10 +- .../mapred/TestTaskAttemptListenerImpl.java | 39 +++++--- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 8 +- 6 files changed, 98 insertions(+), 77 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b9fe77109ae..c13fa4fa10a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + MAPREDUCE-3569. TaskAttemptListener holds a global lock for all + task-updates. (Vinod Kumar Vavilapalli via sseth) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index ba0068098e5..0e6e3eed04c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -19,14 +19,12 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,21 +62,22 @@ import org.apache.hadoop.yarn.service.CompositeService; * This class HAS to be in this package to access package private * methods/classes. */ +@SuppressWarnings({"unchecked" , "deprecation"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { + private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true); + private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class); private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; private InetSocketAddress address; - private Map jvmIDToActiveAttemptMap = - Collections.synchronizedMap(new HashMap()); + private ConcurrentMap + jvmIDToActiveAttemptMap + = new ConcurrentHashMap(); private JobTokenSecretManager jobTokenSecretManager = null; - private Set pendingJvms = - Collections.synchronizedSet(new HashSet()); public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager) { @@ -123,10 +122,9 @@ public class TaskAttemptListenerImpl extends CompositeService server.start(); InetSocketAddress listenerAddress = server.getListenerAddress(); - this.address = - NetUtils.createSocketAddr(listenerAddress.getAddress() - .getLocalHost().getCanonicalHostName() - + ":" + listenerAddress.getPort()); + listenerAddress.getAddress(); + this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost() + .getCanonicalHostName() + ":" + listenerAddress.getPort()); } catch (IOException e) { throw new YarnException(e); } @@ -408,57 +406,59 @@ public class TaskAttemptListenerImpl extends CompositeService WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); - synchronized(this) { - if(pendingJvms.contains(wJvmID)) { - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID); - if (task != null) { //there may be lag in the attempt getting added here - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - //remove the task as it is no more needed and free up the memory - //Also we have already told the JVM to process a task, so it is no - //longer pending, and further request should ask it to exit. - pendingJvms.remove(wJvmID); - jvmIDToActiveAttemptMap.remove(wJvmID); - } - } else { - LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); - jvmTask = new JvmTask(null, true); - } + // Try to look up the task. We remove it directly as we don't give + // multiple tasks to a JVM + org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap + .remove(wJvmID); + if (task != null) { + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + + // remove the task as it is no more needed and free up the memory + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + } else { + LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); + jvmTask = TASK_FOR_INVALID_JVM; } return jvmTask; } @Override - public synchronized void registerPendingTask(WrappedJvmID jvmID) { - //Save this JVM away as one that has not been handled yet - pendingJvms.add(jvmID); + public void registerPendingTask( + org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { + // Create the mapping so that it is easy to look up + // when the jvm comes back to ask for Task. + + // A JVM not present in this map is an illegal task/JVM. + jvmIDToActiveAttemptMap.put(jvmID, task); } @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { - synchronized(this) { - //create the mapping so that it is easy to look up - //when it comes back to ask for Task. - jvmIDToActiveAttemptMap.put(jvmID, task); - //This should not need to happen here, but just to be on the safe side - if(!pendingJvms.add(jvmID)) { - LOG.warn(jvmID+" launched without first being registered"); - } - } - //register this attempt + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + + // The task is launched. Register this for expiry-tracking. + + // Timing can cause this to happen after the real JVM launches and gets a + // task which is still fine as we will only be tracking for expiry a little + // late than usual. taskHeartbeatHandler.register(attemptID); } @Override - public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + public void unregister( + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, WrappedJvmID jvmID) { - //remove the mapping if not already removed + + // Unregistration also comes from the same TaskAttempt which does the + // registration. Events are ordered at TaskAttempt, so unregistration will + // always come after registration. + + // remove the mapping if not already removed jvmIDToActiveAttemptMap.remove(jvmID); - //remove the pending if not already removed - pendingJvms.remove(jvmID); + //unregister this attempt taskHeartbeatHandler.unregister(attemptID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index b5e5cd37b24..7002e69d527 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -32,20 +32,21 @@ public interface TaskAttemptListener { InetSocketAddress getAddress(); /** - * register a JVM with the listener. This should be called as soon as a + * Register a JVM with the listener. This should be called as soon as a * JVM ID is assigned to a task attempt, before it has been launched. + * @param task the task itself for this JVM. * @param jvmID The ID of the JVM . */ - void registerPendingTask(WrappedJvmID jvmID); + void registerPendingTask(Task task, WrappedJvmID jvmID); /** - * Register the task and task attempt with the JVM. This should be called - * when the JVM has been launched. - * @param attemptID the id of the attempt for this JVM. - * @param task the task itself for this JVM. - * @param jvmID the id of the JVM handling the task. + * Register task attempt. This should be called when the JVM has been + * launched. + * + * @param attemptID + * the id of the attempt for this JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + void registerLaunchedTask(TaskAttemptId attemptID); /** * Unregister the JVM and the attempt associated with it. This should be diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index e8689d33460..4655895dbf7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1109,7 +1109,8 @@ public abstract class TaskAttemptImpl implements taskAttempt.jvmID = new WrappedJvmID( taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); - taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID); + taskAttempt.taskAttemptListener.registerPendingTask( + taskAttempt.remoteTask, taskAttempt.jvmID); //launch the container //create the container object to be launched for a given Task attempt @@ -1198,10 +1199,9 @@ public abstract class TaskAttemptImpl implements taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); - // register it to TaskAttemptListener so that it start listening - // for it - taskAttempt.taskAttemptListener.registerLaunchedTask( - taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttempt.taskAttemptListener + .registerLaunchedTask(taskAttempt.attemptId); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index a5756da9934..f26091ac648 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; @@ -68,33 +71,47 @@ public class TestTaskAttemptListenerImpl { JVMId id = new JVMId("foo",1, true, 1); WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - - //Now register the JVM, and see - listener.registerPendingTask(wid); - result = listener.getTask(context); - assertNull(result); - + + // Verify ask after registration but before launch TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID - listener.registerLaunchedTask(attemptID, task, wid); + listener.registerPendingTask(task, wid); + result = listener.getTask(context); + assertNotNull(result); + assertFalse(result.shouldDie); + // Unregister for more testing. + listener.unregister(attemptID, wid); + + // Verify ask after registration and launch + //Now put a task with the ID + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptID); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); - + // Don't unregister yet for more testing. + //Verify that if we call it again a second time we are told to die. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - + listener.unregister(attemptID, wid); + + // Verify after unregistration. + result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + listener.stop(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index b4b9602b810..f17bf6f8af0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; * Mock MRAppMaster. Doesn't start RPC servers. * No threads are started except of the event Dispatcher thread. */ +@SuppressWarnings("unchecked") public class MRApp extends MRAppMaster { private static final Log LOG = LogFactory.getLog(MRApp.class); @@ -323,13 +324,13 @@ public class MRApp extends MRAppMaster { return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} + public void registerLaunchedTask(TaskAttemptId attemptID) {} @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @Override - public void registerPendingTask(WrappedJvmID jvmID) { + public void registerPendingTask(org.apache.hadoop.mapred.Task task, + WrappedJvmID jvmID) { } }; } @@ -357,7 +358,6 @@ public class MRApp extends MRAppMaster { public MockContainerLauncher() { } - @SuppressWarnings("unchecked") @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) {