diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8e4ec55b584..cc6ca36a8a3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -484,6 +484,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort + benchmark consistently. (Siddarth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 0b4ea94e276..6d78a6a8c03 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -77,6 +79,9 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap(); + private Set launchedJVMs = Collections + .newSetFromMap(new ConcurrentHashMap()); + private JobTokenSecretManager jobTokenSecretManager = null; public TaskAttemptListenerImpl(AppContext context, @@ -412,22 +417,28 @@ public class TaskAttemptListenerImpl extends CompositeService // Try to look up the task. We remove it directly as we don't give // multiple tasks to a JVM - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap - .remove(wJvmID); - if (task != null) { - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - - // remove the task as it is no more needed and free up the memory - // Also we have already told the JVM to process a task, so it is no - // longer pending, and further request should ask it to exit. - } else { + if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) { LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); jvmTask = TASK_FOR_INVALID_JVM; + } else { + if (!launchedJVMs.contains(wJvmID)) { + jvmTask = null; + LOG.info("JVM with ID: " + jvmId + + " asking for task before AM launch registered. Given null task"); + } else { + // remove the task as it is no more needed and free up the memory. + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + org.apache.hadoop.mapred.Task task = + jvmIDToActiveAttemptMap.remove(wJvmID); + launchedJVMs.remove(wJvmID); + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + } } return jvmTask; } - + @Override public void registerPendingTask( org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { @@ -440,13 +451,12 @@ public class TaskAttemptListenerImpl extends CompositeService @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + WrappedJvmID jvmId) { + // The AM considers the task to be launched (Has asked the NM to launch it) + // The JVM will only be given a task after this registartion. + launchedJVMs.add(jvmId); - // The task is launched. Register this for expiry-tracking. - - // Timing can cause this to happen after the real JVM launches and gets a - // task which is still fine as we will only be tracking for expiry a little - // late than usual. taskHeartbeatHandler.register(attemptID); } @@ -459,7 +469,12 @@ public class TaskAttemptListenerImpl extends CompositeService // registration. Events are ordered at TaskAttempt, so unregistration will // always come after registration. - // remove the mapping if not already removed + // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid + // synchronization issue with getTask(). getTask should be checking + // jvmIDToActiveAttemptMap before it checks launchedJVMs. + + // remove the mappings if not already removed + launchedJVMs.remove(jvmID); jvmIDToActiveAttemptMap.remove(jvmID); //unregister this attempt diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index 7002e69d527..1d2a0a40614 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -45,8 +45,9 @@ public interface TaskAttemptListener { * * @param attemptID * the id of the attempt for this JVM. + * @param jvmID the ID of the JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID); + void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID); /** * Unregister the JVM and the attempt associated with it. This should be diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index a9e8be6258a..b827a2cdf3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -93,6 +93,7 @@ public class TaskHeartbeatHandler extends AbstractService { public void receivedPing(TaskAttemptId attemptID) { //only put for the registered attempts + //TODO throw an exception if the task isn't registered. runningAttempts.replace(attemptID, clock.getTime()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index d9ed1e53f8e..b296d02d55f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1201,7 +1201,7 @@ public abstract class TaskAttemptImpl implements // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener - .registerLaunchedTask(taskAttempt.attemptId); + .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 47a0e55f586..8737864e413 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -79,21 +80,21 @@ public class TestTaskAttemptListenerImpl { assertNotNull(result); assertTrue(result.shouldDie); - // Verify ask after registration but before launch + // Verify ask after registration but before launch. + // Don't kill, should be null. TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID listener.registerPendingTask(task, wid); result = listener.getTask(context); - assertNotNull(result); - assertFalse(result.shouldDie); + assertNull(result); // Unregister for more testing. listener.unregister(attemptID, wid); // Verify ask after registration and launch //Now put a task with the ID listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptID); + listener.registerLaunchedTask(attemptID, wid); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index f17bf6f8af0..3eb214d79c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -324,7 +324,9 @@ public class MRApp extends MRAppMaster { return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID) {} + public void registerLaunchedTask(TaskAttemptId attemptID, + WrappedJvmID jvmID) { + } @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @@ -463,6 +465,7 @@ public class MRApp extends MRAppMaster { return localStateMachine; } + @SuppressWarnings("rawtypes") public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock,