diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b076bc4d70b..f8983302174 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1818,6 +1818,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3256. Added authorization checks for the protocol between NodeManager and ApplicationMaster. (vinodkv via acmurthy) + MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a + task-scheduling deadlock. (Robert Joseph Evans via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 66526445a9b..ba0068098e5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -23,8 +23,10 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl extends CompositeService private AppContext context; private Server server; - private TaskHeartbeatHandler taskHeartbeatHandler; + protected TaskHeartbeatHandler taskHeartbeatHandler; private InetSocketAddress address; - private Map jvmIDToAttemptMap = + private Map jvmIDToActiveAttemptMap = Collections.synchronizedMap(new HashMap()); private JobTokenSecretManager jobTokenSecretManager = null; + private Set pendingJvms = + Collections.synchronizedSet(new HashSet()); public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager) { @@ -395,35 +399,55 @@ public class TaskAttemptListenerImpl extends CompositeService JVMId jvmId = context.jvmId; LOG.info("JVM with ID : " + jvmId + " asked for a task"); - - // TODO: Is it an authorised container to get a task? Otherwise return null. - - // TODO: Is the request for task-launch still valid? + + JvmTask jvmTask = null; + // TODO: Is it an authorized container to get a task? Otherwise return null. // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update // to jobId and task-type. WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); - org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID); - if (task != null) { //there may be lag in the attempt getting added here - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - JvmTask jvmTask = new JvmTask(task, false); - - //remove the task as it is no more needed and free up the memory - jvmIDToAttemptMap.remove(wJvmID); - - return jvmTask; + synchronized(this) { + if(pendingJvms.contains(wJvmID)) { + org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID); + if (task != null) { //there may be lag in the attempt getting added here + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + + //remove the task as it is no more needed and free up the memory + //Also we have already told the JVM to process a task, so it is no + //longer pending, and further request should ask it to exit. + pendingJvms.remove(wJvmID); + jvmIDToActiveAttemptMap.remove(wJvmID); + } + } else { + LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); + jvmTask = new JvmTask(null, true); + } } - return null; + return jvmTask; + } + + @Override + public synchronized void registerPendingTask(WrappedJvmID jvmID) { + //Save this JVM away as one that has not been handled yet + pendingJvms.add(jvmID); } @Override - public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + public void registerLaunchedTask( + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { - //create the mapping so that it is easy to look up - //when it comes back to ask for Task. - jvmIDToAttemptMap.put(jvmID, task); + synchronized(this) { + //create the mapping so that it is easy to look up + //when it comes back to ask for Task. + jvmIDToActiveAttemptMap.put(jvmID, task); + //This should not need to happen here, but just to be on the safe side + if(!pendingJvms.add(jvmID)) { + LOG.warn(jvmID+" launched without first being registered"); + } + } //register this attempt taskHeartbeatHandler.register(attemptID); } @@ -432,8 +456,9 @@ public class TaskAttemptListenerImpl extends CompositeService public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, WrappedJvmID jvmID) { //remove the mapping if not already removed - jvmIDToAttemptMap.remove(jvmID); - + jvmIDToActiveAttemptMap.remove(jvmID); + //remove the pending if not already removed + pendingJvms.remove(jvmID); //unregister this attempt taskHeartbeatHandler.unregister(attemptID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index 9df88d633cf..b5e5cd37b24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +/** + * This class listens for changes to the state of a Task. + */ public interface TaskAttemptListener { InetSocketAddress getAddress(); - void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + /** + * register a JVM with the listener. This should be called as soon as a + * JVM ID is assigned to a task attempt, before it has been launched. + * @param jvmID The ID of the JVM . + */ + void registerPendingTask(WrappedJvmID jvmID); + + /** + * Register the task and task attempt with the JVM. This should be called + * when the JVM has been launched. + * @param attemptID the id of the attempt for this JVM. + * @param task the task itself for this JVM. + * @param jvmID the id of the JVM handling the task. + */ + void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + /** + * Unregister the JVM and the attempt associated with it. This should be + * called when the attempt/JVM has finished executing and is being cleaned up. + * @param attemptID the ID of the attempt. + * @param jvmID the ID of the JVM for that attempt. + */ void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 0b3187de720..d354bda1fae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1012,6 +1012,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.jvmID = new WrappedJvmID( taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); + taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID); //launch the container //create the container object to be launched for a given Task attempt @@ -1106,7 +1107,7 @@ public abstract class TaskAttemptImpl implements // register it to TaskAttemptListener so that it start listening // for it - taskAttempt.taskAttemptListener.register( + taskAttempt.taskAttemptListener.registerLaunchedTask( taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java new file mode 100644 index 00000000000..a5756da9934 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -0,0 +1,100 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapred; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; +import org.junit.Test; + +public class TestTaskAttemptListenerImpl { + public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { + + public MockTaskAttemptListenerImpl(AppContext context, + JobTokenSecretManager jobTokenSecretManager, + TaskHeartbeatHandler hbHandler) { + super(context, jobTokenSecretManager); + this.taskHeartbeatHandler = hbHandler; + } + + @Override + protected void registerHeartbeatHandler() { + //Empty + } + + @Override + protected void startRpcServer() { + //Empty + } + + @Override + protected void stopRpcServer() { + //Empty + } + } + + @Test + public void testGetTask() throws IOException { + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + MockTaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler); + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + JVMId id = new JVMId("foo",1, true, 1); + WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + + //The JVM ID has not been registered yet so we should kill it. + JvmContext context = new JvmContext(); + context.jvmId = id; + JvmTask result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + + //Now register the JVM, and see + listener.registerPendingTask(wid); + result = listener.getTask(context); + assertNull(result); + + TaskAttemptId attemptID = mock(TaskAttemptId.class); + Task task = mock(Task.class); + //Now put a task with the ID + listener.registerLaunchedTask(attemptID, task, wid); + verify(hbHandler).register(attemptID); + result = listener.getTask(context); + assertNotNull(result); + assertFalse(result.shouldDie); + + //Verify that if we call it again a second time we are told to die. + result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + + listener.unregister(attemptID, wid); + listener.stop(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 4fa2acf16e8..a80a2a760c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -294,11 +294,14 @@ public class MRApp extends MRAppMaster { return null; } @Override - public void register(TaskAttemptId attemptID, + public void registerLaunchedTask(TaskAttemptId attemptID, org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } + @Override + public void registerPendingTask(WrappedJvmID jvmID) { + } }; }