YARN-1629. IndexOutOfBoundsException in MaxRunningAppsEnforcer (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1561998 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-28 08:36:19 +00:00
parent d225ce1ce2
commit 059262e5a5
4 changed files with 69 additions and 17 deletions

View File

@ -350,6 +350,8 @@ Release 2.4.0 - UNRELEASED
YARN-1642. RMDTRenewer#getRMClient should use ClientRMProxy (kasha)
YARN-1629. IndexOutOfBoundsException in MaxRunningAppsEnforcer (Sandy Ryza)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -91,15 +91,6 @@ public class FSLeafQueue extends FSQueue {
}
}
public void makeAppRunnable(AppSchedulable appSched) {
if (!nonRunnableAppScheds.remove(appSched)) {
throw new IllegalStateException("Can't make app runnable that does not " +
"already exist in queue as non-runnable" + appSched);
}
runnableAppScheds.add(appSched);
}
public Collection<AppSchedulable> getRunnableAppSchedulables() {
return runnableAppScheds;
}

View File

@ -24,6 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
@ -33,6 +36,8 @@ import com.google.common.collect.ListMultimap;
* constraints
*/
public class MaxRunningAppsEnforcer {
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
private final FairScheduler scheduler;
// Tracks the number of running applications by user.
@ -163,7 +168,7 @@ public class MaxRunningAppsEnforcer {
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
FSSchedulerApp prev = null;
int numNowRunnable = 0;
List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
while (iter.hasNext()) {
FSSchedulerApp next = iter.next();
if (next == prev) {
@ -173,21 +178,34 @@ public class MaxRunningAppsEnforcer {
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
trackRunnableApp(next);
AppSchedulable appSched = next.getAppSchedulable();
next.getQueue().makeAppRunnable(appSched);
if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
throw new IllegalStateException("Waiting app " + next
+ " expected to be in usersNonRunnableApps");
}
next.getQueue().getRunnableAppSchedulables().add(appSched);
noLongerPendingApps.add(appSched);
// No more than one app per list will be able to be made runnable, so
// we can stop looking after we've found that many
if (numNowRunnable >= appsNowMaybeRunnable.size()) {
if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
break;
}
}
prev = next;
}
// We remove the apps from their pending lists afterwards so that we don't
// pull them out from under the iterator. If they are not in these lists
// in the first place, there is a bug.
for (AppSchedulable appSched : noLongerPendingApps) {
if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
.remove(appSched)) {
LOG.error("Can't make app runnable that does not already exist in queue"
+ " as non-runnable: " + appSched + ". This should never happen.");
}
if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
LOG.error("Waiting app " + appSched + " expected to be in "
+ "usersNonRunnableApps, but was not. This should never happen.");
}
}
}
/**
@ -225,7 +243,7 @@ public class MaxRunningAppsEnforcer {
* This allows us to pick which list to advance in O(log(num lists)) instead
* of O(num lists) time.
*/
private static class MultiListStartTimeIterator implements
static class MultiListStartTimeIterator implements
Iterator<FSSchedulerApp> {
private List<AppSchedulable>[] appLists;

View File

@ -21,6 +21,10 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@ -152,4 +156,41 @@ public class TestMaxRunningAppsEnforcer {
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
}
@Test
public void testMultipleAppsWaitingOnCousinQueue() {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(2, leaf2.getNonRunnableAppSchedulables().size());
removeApp(app1);
assertEquals(0, leaf1.getRunnableAppSchedulables().size());
assertEquals(2, leaf2.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
}
@Test
public void testMultiListStartTimeIteratorEmptyAppLists() {
List<List<AppSchedulable>> lists = new ArrayList<List<AppSchedulable>>();
lists.add(Arrays.asList(mockAppSched(1)));
lists.add(Arrays.asList(mockAppSched(2)));
Iterator<FSSchedulerApp> iter =
new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
assertEquals(1, iter.next().getAppSchedulable().getStartTime());
assertEquals(2, iter.next().getAppSchedulable().getStartTime());
}
private AppSchedulable mockAppSched(long startTime) {
AppSchedulable appSched = mock(AppSchedulable.class);
when(appSched.getStartTime()).thenReturn(startTime);
FSSchedulerApp schedApp = mock(FSSchedulerApp.class);
when(schedApp.getAppSchedulable()).thenReturn(appSched);
when(appSched.getApp()).thenReturn(schedApp);
return appSched;
}
}