MAPREDUCE-2705. Permits parallel multiple task launches. Contributed by Thomas Graves.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1153720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2011-08-04 01:31:05 +00:00
parent 44b7e03114
commit 703b5f412c
1 changed files with 165 additions and 0 deletions

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
/**
* Tests {@link TaskLauncherThreaded}
*/
public class TestTaskLauncherThreaded {
private static int jobLocalizedCount = 0;
private static int jobLaunchCount = 0;
private static boolean quitWaiting = false;
private static boolean firstJobStarted = false;
private static boolean firstJobFinished = false;
private static class MyTaskTracker extends TaskTracker {
// stub out functions called from startNewTask
@Override
RunningJob localizeJob(TaskInProgress tip)
throws IOException, InterruptedException {
if (firstJobStarted == false) {
firstJobStarted = true;
while (quitWaiting == false) {
Thread.sleep(100);
}
firstJobFinished = true;
}
// mock out a RunningJob
RunningJob rjob = mock(RunningJob.class);
when(rjob.getJobConf()).thenReturn(new JobConf());
jobLocalizedCount++;
return rjob;
}
@Override
protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
UserGroupInformation ugi) throws IOException {
jobLaunchCount++;
}
}
/**
* Tests the case "task localizing doesn't block other tasks".
*
* Launches one task that simulates a task doing large localization,
* then starts a second task and verifies that second task is not
* blocked waiting behind the first task.
*
* @throws IOException
*/
@Test
public void testLocalizationNotBlockingOtherTasks() throws IOException {
// setup a TaskTracker
JobConf ttConf = new JobConf();
ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 4);
TaskTracker tt = new MyTaskTracker();
tt.runningJobs = new TreeMap<JobID, TaskTracker.RunningJob>();
tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
tt.setIndexCache(new IndexCache(ttConf));
tt.setTaskMemoryManagerEnabledFlag();
// start map-task launcher with four slots
TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
mapLauncher.start();
// launch a task which simulates large localization
String jtId = "test";
TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, true, 0, 0);
Task task = new MapTask(null, attemptID, 0, null, 2);
mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
// verify that task is added to runningTasks
TaskInProgress runningTip = tt.runningTasks.get(attemptID);
assertNotNull(runningTip);
// wait for a while for the first task to start initializing
// this loop waits at most for 30 seconds
for (int i = 0; i < 300; i++) {
if (firstJobStarted == true) {
break;
}
UtilsForTests.waitFor(100);
}
// Now start a second task and make sure it doesn't wait while first one initializes
String secondjtId = "test2";
TaskAttemptID secondAttemptID = new TaskAttemptID(secondjtId, 1, true, 0, 0);
Task secondTask = new MapTask(null, secondAttemptID, 0, null, 2);
mapLauncher.addToTaskQueue(new LaunchTaskAction(secondTask));
// verify that task is added to runningTasks
TaskInProgress secondRunningTip = tt.runningTasks.get(secondAttemptID);
assertNotNull(secondRunningTip);
// wait for a while for the second task to be launched
// this loop waits at most for 30 seconds
for (int i = 0; i < 300; i++) {
if (jobLaunchCount > 0) {
break;
}
UtilsForTests.waitFor(100);
}
assertEquals("Second task didn't run or both ran", 1, jobLocalizedCount);
assertEquals("second task didn't try to launch", 1, jobLaunchCount);
assertFalse("Second task didn't finish first task initializing", firstJobFinished);
// tell first task to stop waiting
quitWaiting = true;
// wait for a while for the first task finishes initializing
// this loop waits at most for 30 seconds
for (int i = 0; i < 300; i++) {
if (firstJobFinished == true) {
break;
}
UtilsForTests.waitFor(100);
}
assertTrue("First task didn't finish initializing", firstJobFinished);
// wait for a while for the first task finishes
// this loop waits at most for 30 seconds
for (int i = 0; i < 300; i++) {
if (jobLaunchCount > 1) {
break;
}
UtilsForTests.waitFor(100);
}
assertEquals("Both tasks didn't run", 2, jobLocalizedCount);
assertEquals("First task didn't try to launch", 2, jobLaunchCount);
}
}