YARN-1311. Fixed app specific scheduler-events' names to be app-attempt based. Contributed by Vinod Kumar Vavilapalli
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
791e43c53b
commit
28db28d491
|
@ -17,67 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.sls.scheduler;
|
||||
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
||||
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.CsvReporter;
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SlidingWindowReservoir;
|
||||
import com.codahale.metrics.Timer;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
|
||||
.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
||||
.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
|
||||
.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
|
||||
.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
|
@ -97,6 +36,55 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
||||
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.CsvReporter;
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SlidingWindowReservoir;
|
||||
import com.codahale.metrics.Timer;
|
||||
|
||||
public class ResourceSchedulerWrapper implements ResourceScheduler,
|
||||
Configurable {
|
||||
private static final String EOL = System.getProperty("line.separator");
|
||||
|
@ -246,11 +234,11 @@ public class ResourceSchedulerWrapper implements ResourceScheduler,
|
|||
(NodeUpdateSchedulerEvent)schedulerEvent);
|
||||
schedulerEvent = eventWrapper;
|
||||
updateQueueWithNodeUpdate(eventWrapper);
|
||||
} else if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED
|
||||
&& schedulerEvent instanceof AppRemovedSchedulerEvent) {
|
||||
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
|
||||
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
|
||||
// check if having AM Container, update resource usage information
|
||||
AppRemovedSchedulerEvent appRemoveEvent =
|
||||
(AppRemovedSchedulerEvent) schedulerEvent;
|
||||
AppAttemptRemovedSchedulerEvent appRemoveEvent =
|
||||
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
|
||||
ApplicationAttemptId appAttemptId =
|
||||
appRemoveEvent.getApplicationAttemptID();
|
||||
String queue = appQueueMap.get(appAttemptId);
|
||||
|
@ -275,18 +263,18 @@ public class ResourceSchedulerWrapper implements ResourceScheduler,
|
|||
schedulerHandleCounter.inc();
|
||||
schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
|
||||
|
||||
if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED
|
||||
&& schedulerEvent instanceof AppRemovedSchedulerEvent) {
|
||||
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
|
||||
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
|
||||
SLSRunner.decreaseRemainingApps();
|
||||
AppRemovedSchedulerEvent appRemoveEvent =
|
||||
(AppRemovedSchedulerEvent) schedulerEvent;
|
||||
AppAttemptRemovedSchedulerEvent appRemoveEvent =
|
||||
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
|
||||
ApplicationAttemptId appAttemptId =
|
||||
appRemoveEvent.getApplicationAttemptID();
|
||||
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
|
||||
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
|
||||
&& schedulerEvent instanceof AppAddedSchedulerEvent) {
|
||||
AppAddedSchedulerEvent appAddEvent =
|
||||
(AppAddedSchedulerEvent) schedulerEvent;
|
||||
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED
|
||||
&& schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
|
||||
AppAttemptAddedSchedulerEvent appAddEvent =
|
||||
(AppAttemptAddedSchedulerEvent) schedulerEvent;
|
||||
String queueName = appAddEvent.getQueue();
|
||||
appQueueMap.put(appAddEvent.getApplicationAttemptId(), queueName);
|
||||
}
|
||||
|
|
|
@ -87,8 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||
|
@ -742,9 +742,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
new Token<AMRMTokenIdentifier>(id,
|
||||
appAttempt.rmContext.getAMRMTokenSecretManager());
|
||||
|
||||
// Add the application to the scheduler
|
||||
// Add the applicationAttempt to the scheduler
|
||||
appAttempt.eventHandler.handle(
|
||||
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
||||
new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
||||
appAttempt.submissionContext.getQueue(), appAttempt.user));
|
||||
}
|
||||
}
|
||||
|
@ -1055,8 +1055,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
|
||||
appAttempt.eventHandler.handle(appEvent);
|
||||
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
|
||||
finalAttemptState));
|
||||
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
|
||||
appAttemptId, finalAttemptState));
|
||||
|
||||
appAttempt.removeCredentials(appAttempt);
|
||||
}
|
||||
|
|
|
@ -71,8 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
@ -417,7 +417,7 @@ public class CapacityScheduler
|
|||
}
|
||||
|
||||
private synchronized void
|
||||
addApplication(ApplicationAttemptId applicationAttemptId,
|
||||
addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
|
||||
String queueName, String user) {
|
||||
|
||||
// Sanity checks
|
||||
|
@ -466,7 +466,7 @@ public class CapacityScheduler
|
|||
RMAppAttemptEventType.APP_ACCEPTED));
|
||||
}
|
||||
|
||||
private synchronized void doneApplication(
|
||||
private synchronized void doneApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
RMAppAttemptState rmAppAttemptFinalState) {
|
||||
LOG.info("Application " + applicationAttemptId + " is done." +
|
||||
|
@ -740,18 +740,20 @@ public class CapacityScheduler
|
|||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||
}
|
||||
break;
|
||||
case APP_ADDED:
|
||||
case APP_ATTEMPT_ADDED:
|
||||
{
|
||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
||||
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
|
||||
.getQueue(), appAddedEvent.getUser());
|
||||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
|
||||
}
|
||||
break;
|
||||
case APP_REMOVED:
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
{
|
||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
||||
doneApplication(appRemovedEvent.getApplicationAttemptID(),
|
||||
appRemovedEvent.getFinalAttemptState());
|
||||
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||
(AppAttemptRemovedSchedulerEvent) event;
|
||||
doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||
appAttemptRemovedEvent.getFinalAttemptState());
|
||||
}
|
||||
break;
|
||||
case CONTAINER_EXPIRED:
|
||||
|
|
|
@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
||||
public class AppAddedSchedulerEvent extends SchedulerEvent {
|
||||
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
|
||||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final String queue;
|
||||
private final String user;
|
||||
|
||||
public AppAddedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
|
||||
String queue, String user) {
|
||||
super(SchedulerEventType.APP_ADDED);
|
||||
public AppAttemptAddedSchedulerEvent(
|
||||
ApplicationAttemptId applicationAttemptId, String queue, String user) {
|
||||
super(SchedulerEventType.APP_ATTEMPT_ADDED);
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.queue = queue;
|
||||
this.user = user;
|
|
@ -21,14 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
||||
public class AppRemovedSchedulerEvent extends SchedulerEvent {
|
||||
public class AppAttemptRemovedSchedulerEvent extends SchedulerEvent {
|
||||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final RMAppAttemptState finalAttemptState;
|
||||
|
||||
public AppRemovedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
|
||||
public AppAttemptRemovedSchedulerEvent(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
RMAppAttemptState finalAttemptState) {
|
||||
super(SchedulerEventType.APP_REMOVED);
|
||||
super(SchedulerEventType.APP_ATTEMPT_REMOVED);
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.finalAttemptState = finalAttemptState;
|
||||
}
|
|
@ -25,9 +25,9 @@ public enum SchedulerEventType {
|
|||
NODE_REMOVED,
|
||||
NODE_UPDATE,
|
||||
|
||||
// Source: App
|
||||
APP_ADDED,
|
||||
APP_REMOVED,
|
||||
// Source: RMAppAttempt
|
||||
APP_ATTEMPT_ADDED,
|
||||
APP_ATTEMPT_REMOVED,
|
||||
|
||||
// Source: ContainerAllocationExpirer
|
||||
CONTAINER_EXPIRED
|
||||
|
|
|
@ -75,8 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
@ -591,7 +591,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
* user. This will accept a new app even if the user or queue is above
|
||||
* configured limits, but the app will not be marked as runnable.
|
||||
*/
|
||||
protected synchronized void addApplication(
|
||||
protected synchronized void addApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
||||
if (queueName == null || queueName.isEmpty()) {
|
||||
String message = "Reject application " + applicationAttemptId +
|
||||
|
@ -674,7 +674,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
return queue;
|
||||
}
|
||||
|
||||
private synchronized void removeApplication(
|
||||
private synchronized void removeApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
RMAppAttemptState rmAppAttemptFinalState) {
|
||||
LOG.info("Application " + applicationAttemptId + " is done." +
|
||||
|
@ -1090,22 +1090,24 @@ public class FairScheduler implements ResourceScheduler {
|
|||
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
||||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||
break;
|
||||
case APP_ADDED:
|
||||
if (!(event instanceof AppAddedSchedulerEvent)) {
|
||||
case APP_ATTEMPT_ADDED:
|
||||
if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
|
||||
throw new RuntimeException("Unexpected event type: " + event);
|
||||
}
|
||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
||||
String queue = appAddedEvent.getQueue();
|
||||
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
||||
appAddedEvent.getUser());
|
||||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
String queue = appAttemptAddedEvent.getQueue();
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
queue, appAttemptAddedEvent.getUser());
|
||||
break;
|
||||
case APP_REMOVED:
|
||||
if (!(event instanceof AppRemovedSchedulerEvent)) {
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
||||
throw new RuntimeException("Unexpected event type: " + event);
|
||||
}
|
||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
||||
removeApplication(appRemovedEvent.getApplicationAttemptID(),
|
||||
appRemovedEvent.getFinalAttemptState());
|
||||
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||
(AppAttemptRemovedSchedulerEvent) event;
|
||||
removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||
appAttemptRemovedEvent.getFinalAttemptState());
|
||||
break;
|
||||
case CONTAINER_EXPIRED:
|
||||
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -67,11 +66,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
@ -340,7 +348,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
return nodes.get(nodeId);
|
||||
}
|
||||
|
||||
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
|
||||
private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
||||
String user) {
|
||||
// TODO: Fix store
|
||||
FiCaSchedulerApp schedulerApp =
|
||||
|
@ -355,7 +363,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
RMAppAttemptEventType.APP_ACCEPTED));
|
||||
}
|
||||
|
||||
private synchronized void doneApplication(
|
||||
private synchronized void doneApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
RMAppAttemptState rmAppAttemptFinalState)
|
||||
throws IOException {
|
||||
|
@ -689,22 +697,25 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||
}
|
||||
break;
|
||||
case APP_ADDED:
|
||||
case APP_ATTEMPT_ADDED:
|
||||
{
|
||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
|
||||
.getUser());
|
||||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
appAttemptAddedEvent.getUser());
|
||||
}
|
||||
break;
|
||||
case APP_REMOVED:
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
{
|
||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
||||
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||
(AppAttemptRemovedSchedulerEvent) event;
|
||||
try {
|
||||
doneApplication(appRemovedEvent.getApplicationAttemptID(),
|
||||
appRemovedEvent.getFinalAttemptState());
|
||||
doneApplicationAttempt(
|
||||
appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||
appAttemptRemovedEvent.getFinalAttemptState());
|
||||
} catch(IOException ie) {
|
||||
LOG.error("Unable to remove application "
|
||||
+ appRemovedEvent.getApplicationAttemptID(), ie);
|
||||
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -166,7 +166,7 @@ public class Application {
|
|||
resourceManager.getClientRMService().submitApplication(request);
|
||||
|
||||
// Notify scheduler
|
||||
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
||||
AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
|
||||
this.applicationAttemptId, this.queue, this.user);
|
||||
scheduler.handle(appAddedEvent1);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
|
@ -297,8 +297,8 @@ public class TestFifoScheduler {
|
|||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||
appId1, 1);
|
||||
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
||||
"user");
|
||||
SchedulerEvent event1 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
|
||||
fs.handle(event1);
|
||||
|
||||
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||
|
@ -388,15 +388,15 @@ public class TestFifoScheduler {
|
|||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||
appId1, 1);
|
||||
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
||||
"user");
|
||||
SchedulerEvent event1 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
|
||||
fs.handle(event1);
|
||||
|
||||
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
|
||||
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
|
||||
appId2, 1);
|
||||
SchedulerEvent event2 = new AppAddedSchedulerEvent(appAttemptId2, "queue",
|
||||
"user");
|
||||
SchedulerEvent event2 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
|
||||
fs.handle(event2);
|
||||
|
||||
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -159,7 +159,8 @@ public class TestResourceManager {
|
|||
application.finishTask(t3);
|
||||
|
||||
// Notify scheduler application is finished.
|
||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||
new AppAttemptRemovedSchedulerEvent(
|
||||
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
|
@ -329,7 +329,7 @@ public class TestRMAppAttemptTransitions {
|
|||
// Check events
|
||||
verify(masterService).
|
||||
registerAppAttempt(applicationAttempt.getAppAttemptId());
|
||||
verify(scheduler).handle(any(AppAddedSchedulerEvent.class));
|
||||
verify(scheduler).handle(any(AppAttemptAddedSchedulerEvent.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
|
@ -555,8 +555,8 @@ public class TestCapacityScheduler {
|
|||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
|
||||
"user");
|
||||
SchedulerEvent event =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
|
||||
cs.handle(event);
|
||||
|
||||
// Verify the blacklist can be updated independent of requesting containers
|
||||
|
|
|
@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -348,7 +348,7 @@ public class TestLeafQueue {
|
|||
a.submitApplication(app_0, user_0, B);
|
||||
|
||||
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
|
||||
AppRemovedSchedulerEvent event = new AppRemovedSchedulerEvent(
|
||||
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
|
||||
appAttemptId_0, RMAppAttemptState.FAILED);
|
||||
cs.handle(event);
|
||||
|
||||
|
@ -366,7 +366,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, a.getMetrics().getAppsPending());
|
||||
|
||||
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
|
||||
event = new AppRemovedSchedulerEvent(appAttemptId_0,
|
||||
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
|
||||
RMAppAttemptState.FINISHED);
|
||||
cs.handle(event);
|
||||
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -33,11 +33,11 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
|
@ -54,10 +54,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -79,9 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
|
@ -256,7 +255,7 @@ public class TestFairScheduler {
|
|||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
||||
String queueId, String userId, int numContainers, int priority) {
|
||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(id, queueId, userId);
|
||||
scheduler.addApplicationAttempt(id, queueId, userId);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
priority, numContainers, true);
|
||||
|
@ -619,8 +618,8 @@ public class TestFairScheduler {
|
|||
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
||||
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
||||
|
||||
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
|
||||
appAttemptId, "default", "user1");
|
||||
AppAttemptAddedSchedulerEvent appAddedEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user1");
|
||||
scheduler.handle(appAddedEvent);
|
||||
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||
.getRunnableAppSchedulables().size());
|
||||
|
@ -641,8 +640,8 @@ public class TestFairScheduler {
|
|||
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
||||
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
||||
|
||||
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
|
||||
appAttemptId, "default", "user2");
|
||||
AppAttemptAddedSchedulerEvent appAddedEvent2 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user2");
|
||||
scheduler.handle(appAddedEvent2);
|
||||
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||
.getRunnableAppSchedulables().size());
|
||||
|
@ -661,8 +660,8 @@ public class TestFairScheduler {
|
|||
|
||||
// submit app with empty queue
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
||||
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
|
||||
appAttemptId, "", "user1");
|
||||
AppAttemptAddedSchedulerEvent appAddedEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "", "user1");
|
||||
scheduler.handle(appAddedEvent);
|
||||
|
||||
// submission rejected
|
||||
|
@ -787,11 +786,11 @@ public class TestFairScheduler {
|
|||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||
scheduler.addApplication(id11, "root.queue1", "user1");
|
||||
scheduler.addApplicationAttempt(id11, "root.queue1", "user1");
|
||||
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
||||
scheduler.addApplication(id21, "root.queue2", "user1");
|
||||
scheduler.addApplicationAttempt(id21, "root.queue2", "user1");
|
||||
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
||||
scheduler.addApplication(id22, "root.queue2", "user1");
|
||||
scheduler.addApplicationAttempt(id22, "root.queue2", "user1");
|
||||
|
||||
int minReqSize =
|
||||
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
|
||||
|
@ -833,8 +832,9 @@ public class TestFairScheduler {
|
|||
public void testAppAdditionAndRemoval() throws Exception {
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
||||
createAppAttemptId(1, 1), "default", "user1");
|
||||
AppAttemptAddedSchedulerEvent appAddedEvent1 =
|
||||
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), "default",
|
||||
"user1");
|
||||
scheduler.handle(appAddedEvent1);
|
||||
|
||||
// Scheduler should have two queues (the default and the one created for user1)
|
||||
|
@ -844,7 +844,7 @@ public class TestFairScheduler {
|
|||
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||
.getRunnableAppSchedulables().size());
|
||||
|
||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
||||
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
|
||||
|
||||
// Now remove app
|
||||
|
@ -1526,7 +1526,7 @@ public class TestFairScheduler {
|
|||
scheduler.handle(nodeEvent2);
|
||||
|
||||
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(appId, "queue1", "user1");
|
||||
scheduler.addApplicationAttempt(appId, "queue1", "user1");
|
||||
|
||||
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||
// a different rack
|
||||
|
@ -1764,7 +1764,7 @@ public class TestFairScheduler {
|
|||
|
||||
ApplicationAttemptId attId =
|
||||
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(attId, queue, user);
|
||||
scheduler.addApplicationAttempt(attId, queue, user);
|
||||
|
||||
numTries = 0;
|
||||
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
||||
|
@ -2238,8 +2238,8 @@ public class TestFairScheduler {
|
|||
verifyQueueNumRunnable("queue1", 2, 1);
|
||||
|
||||
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
|
||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
||||
attId1, RMAppAttemptState.FINISHED);
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED);
|
||||
scheduler.handle(appRemovedEvent1);
|
||||
verifyAppRunnable(attId2, true);
|
||||
verifyQueueNumRunnable("queue2", 1, 0);
|
||||
|
@ -2302,8 +2302,8 @@ public class TestFairScheduler {
|
|||
|
||||
// Even though the app was removed from sub3, the app from sub2 gets to go
|
||||
// because it came in first
|
||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
||||
attId2, RMAppAttemptState.FINISHED);
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED);
|
||||
scheduler.handle(appRemovedEvent1);
|
||||
verifyAppRunnable(attId4, true);
|
||||
verifyQueueNumRunnable("queue1.sub2", 2, 0);
|
||||
|
@ -2311,16 +2311,16 @@ public class TestFairScheduler {
|
|||
verifyQueueNumRunnable("queue1.sub3", 0, 1);
|
||||
|
||||
// Now test removal of a non-runnable app
|
||||
AppRemovedSchedulerEvent appRemovedEvent2 = new AppRemovedSchedulerEvent(
|
||||
attId5, RMAppAttemptState.KILLED);
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
|
||||
new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED);
|
||||
scheduler.handle(appRemovedEvent2);
|
||||
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
|
||||
.get("user1").size());
|
||||
// verify app gone in queue accounting
|
||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||
// verify it doesn't become runnable when there would be space for it
|
||||
AppRemovedSchedulerEvent appRemovedEvent3 = new AppRemovedSchedulerEvent(
|
||||
attId4, RMAppAttemptState.FINISHED);
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
|
||||
new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED);
|
||||
scheduler.handle(appRemovedEvent3);
|
||||
verifyQueueNumRunnable("queue1.sub2", 1, 0);
|
||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||
|
@ -2356,7 +2356,7 @@ public class TestFairScheduler {
|
|||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
fs.addApplication(appAttemptId, "queue11", "user11");
|
||||
fs.addApplicationAttempt(appAttemptId, "queue11", "user11");
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
|
|
|
@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
|
@ -150,13 +150,13 @@ public class TestFifoScheduler {
|
|||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
|
||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "queue",
|
||||
"user");
|
||||
SchedulerEvent event =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||
schedular.handle(event);
|
||||
|
||||
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
|
||||
|
||||
event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||
event = new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||
schedular.handle(event);
|
||||
|
||||
int afterAppsSubmitted = metrics.getAppsSubmitted();
|
||||
|
@ -188,8 +188,8 @@ public class TestFifoScheduler {
|
|||
int _appAttemptId = 1;
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||
_appAttemptId);
|
||||
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
|
||||
"queue1", "user1");
|
||||
AppAttemptAddedSchedulerEvent appEvent1 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1");
|
||||
scheduler.handle(appEvent1);
|
||||
|
||||
int memory = 64;
|
||||
|
@ -274,8 +274,8 @@ public class TestFifoScheduler {
|
|||
int _appAttemptId = 1;
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||
_appAttemptId);
|
||||
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
|
||||
"queue1", "user1");
|
||||
AppAttemptAddedSchedulerEvent appEvent1 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1");
|
||||
scheduler.handle(appEvent1);
|
||||
|
||||
int memory = 1024;
|
||||
|
@ -541,8 +541,8 @@ public class TestFifoScheduler {
|
|||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
|
||||
"user");
|
||||
SchedulerEvent event =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
|
||||
fs.handle(event);
|
||||
|
||||
// Verify the blacklist can be updated independent of requesting containers
|
||||
|
|
Loading…
Reference in New Issue