YARN-9552. FairScheduler: NODE_UPDATE can cause NoSuchElementException. Contributed by Peter Bacsko.

This commit is contained in:
Giovanni Matteo Fumarola 2019-05-15 11:50:46 -07:00
parent 9569015802
commit 55bd35921c
3 changed files with 48 additions and 5 deletions

View File

@ -499,12 +499,15 @@ public class AppSchedulingInfo {
public PendingAsk getNextPendingAsk() { public PendingAsk getNextPendingAsk() {
readLock.lock(); readLock.lock();
try { try {
SchedulerRequestKey firstRequestKey = schedulerKeys.first(); if (!schedulerKeys.isEmpty()) {
return getPendingAsk(firstRequestKey, ResourceRequest.ANY); SchedulerRequestKey firstRequestKey = schedulerKeys.first();
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
} else {
return null;
}
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) { public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) {

View File

@ -926,8 +926,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (!isAmRunning() && !getUnmanagedAM()) { if (!isAmRunning() && !getUnmanagedAM()) {
// Return true if we have not ask, or queue is not be able to run app's AM // Return true if we have not ask, or queue is not be able to run app's AM
PendingAsk ask = appSchedulingInfo.getNextPendingAsk(); PendingAsk ask = appSchedulingInfo.getNextPendingAsk();
if (ask.getCount() == 0 || !getQueue().canRunAppAM( if (ask != null && (ask.getCount() == 0 || !getQueue().canRunAppAM(
ask.getPerAllocationResource())) { ask.getPerAllocationResource()))) {
return true; return true;
} }
} }

View File

@ -19,7 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -33,9 +36,13 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -344,6 +351,39 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
assertEquals(clusterResource, spyApp.getHeadroom()); assertEquals(clusterResource, spyApp.getHeadroom());
} }
/**
* Ensure that no pending ask request inside appSchedulingInfo
* does not result in an error.
*/
@Test
public void testNoNextPendingAsk() {
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = Mockito.mock(RMContext.class);
ConcurrentMap<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
RMApp rmApp = Mockito.mock(RMApp.class);
rmApps.put(applicationAttemptId.getApplicationId(), rmApp);
ApplicationSubmissionContext appContext =
Mockito.mock(ApplicationSubmissionContext.class);
Mockito.when(appContext.getUnmanagedAM()).thenReturn(false);
Mockito.when(appContext.getLogAggregationContext())
.thenReturn(Mockito.mock(LogAggregationContext.class));
Mockito.when(rmApp.getApplicationSchedulingEnvs())
.thenReturn(new HashMap<>());
Mockito.when(rmApp.getApplicationSubmissionContext())
.thenReturn(appContext);
Mockito.when(rmContext.getRMApps()).thenReturn(rmApps);
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue,
null, rmContext);
schedulerApp.setAmRunning(false);
FSSchedulerNode schedulerNode = Mockito.mock(FSSchedulerNode.class);
Resource resource = schedulerApp.assignContainer(schedulerNode);
assertEquals(Resources.none(), resource);
}
private static long min(long value1, long value2, long value3) { private static long min(long value1, long value2, long value3) {
return Math.min(Math.min(value1, value2), value3); return Math.min(Math.min(value1, value2), value3);
} }