YARN-1311. Fixed app specific scheduler-events' names to be app-attempt based. Contributed by Vinod Kumar Vavilapalli
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
288506022f
commit
38c32ed239
|
@ -165,6 +165,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1325. Modified RM HA configuration validation to also ensure that
|
YARN-1325. Modified RM HA configuration validation to also ensure that
|
||||||
multiple RMs are configured. (Xuan Gong via vinodkv)
|
multiple RMs are configured. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
YARN-1311. Fixed app specific scheduler-events' names to be app-attempt
|
||||||
|
based. (vinodkv via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -87,8 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||||
|
@ -742,9 +742,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
new Token<AMRMTokenIdentifier>(id,
|
new Token<AMRMTokenIdentifier>(id,
|
||||||
appAttempt.rmContext.getAMRMTokenSecretManager());
|
appAttempt.rmContext.getAMRMTokenSecretManager());
|
||||||
|
|
||||||
// Add the application to the scheduler
|
// Add the applicationAttempt to the scheduler
|
||||||
appAttempt.eventHandler.handle(
|
appAttempt.eventHandler.handle(
|
||||||
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
||||||
appAttempt.submissionContext.getQueue(), appAttempt.user));
|
appAttempt.submissionContext.getQueue(), appAttempt.user));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1055,8 +1055,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
appAttempt.eventHandler.handle(appEvent);
|
appAttempt.eventHandler.handle(appEvent);
|
||||||
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
|
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
|
||||||
finalAttemptState));
|
appAttemptId, finalAttemptState));
|
||||||
|
|
||||||
appAttempt.removeCredentials(appAttempt);
|
appAttempt.removeCredentials(appAttempt);
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,8 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
@ -417,7 +417,7 @@ public class CapacityScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void
|
private synchronized void
|
||||||
addApplication(ApplicationAttemptId applicationAttemptId,
|
addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
|
||||||
String queueName, String user) {
|
String queueName, String user) {
|
||||||
|
|
||||||
// Sanity checks
|
// Sanity checks
|
||||||
|
@ -466,7 +466,7 @@ public class CapacityScheduler
|
||||||
RMAppAttemptEventType.APP_ACCEPTED));
|
RMAppAttemptEventType.APP_ACCEPTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void doneApplication(
|
private synchronized void doneApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState) {
|
RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
LOG.info("Application " + applicationAttemptId + " is done." +
|
LOG.info("Application " + applicationAttemptId + " is done." +
|
||||||
|
@ -740,18 +740,20 @@ public class CapacityScheduler
|
||||||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_ADDED:
|
case APP_ATTEMPT_ADDED:
|
||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||||
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
.getQueue(), appAddedEvent.getUser());
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
|
appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
{
|
{
|
||||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||||
doneApplication(appRemovedEvent.getApplicationAttemptID(),
|
(AppAttemptRemovedSchedulerEvent) event;
|
||||||
appRemovedEvent.getFinalAttemptState());
|
doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||||
|
appAttemptRemovedEvent.getFinalAttemptState());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CONTAINER_EXPIRED:
|
case CONTAINER_EXPIRED:
|
||||||
|
|
|
@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
|
||||||
public class AppAddedSchedulerEvent extends SchedulerEvent {
|
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
|
||||||
|
|
||||||
private final ApplicationAttemptId applicationAttemptId;
|
private final ApplicationAttemptId applicationAttemptId;
|
||||||
private final String queue;
|
private final String queue;
|
||||||
private final String user;
|
private final String user;
|
||||||
|
|
||||||
public AppAddedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
|
public AppAttemptAddedSchedulerEvent(
|
||||||
String queue, String user) {
|
ApplicationAttemptId applicationAttemptId, String queue, String user) {
|
||||||
super(SchedulerEventType.APP_ADDED);
|
super(SchedulerEventType.APP_ATTEMPT_ADDED);
|
||||||
this.applicationAttemptId = applicationAttemptId;
|
this.applicationAttemptId = applicationAttemptId;
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.user = user;
|
this.user = user;
|
|
@ -21,14 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
|
||||||
public class AppRemovedSchedulerEvent extends SchedulerEvent {
|
public class AppAttemptRemovedSchedulerEvent extends SchedulerEvent {
|
||||||
|
|
||||||
private final ApplicationAttemptId applicationAttemptId;
|
private final ApplicationAttemptId applicationAttemptId;
|
||||||
private final RMAppAttemptState finalAttemptState;
|
private final RMAppAttemptState finalAttemptState;
|
||||||
|
|
||||||
public AppRemovedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
|
public AppAttemptRemovedSchedulerEvent(
|
||||||
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState finalAttemptState) {
|
RMAppAttemptState finalAttemptState) {
|
||||||
super(SchedulerEventType.APP_REMOVED);
|
super(SchedulerEventType.APP_ATTEMPT_REMOVED);
|
||||||
this.applicationAttemptId = applicationAttemptId;
|
this.applicationAttemptId = applicationAttemptId;
|
||||||
this.finalAttemptState = finalAttemptState;
|
this.finalAttemptState = finalAttemptState;
|
||||||
}
|
}
|
|
@ -25,9 +25,9 @@ public enum SchedulerEventType {
|
||||||
NODE_REMOVED,
|
NODE_REMOVED,
|
||||||
NODE_UPDATE,
|
NODE_UPDATE,
|
||||||
|
|
||||||
// Source: App
|
// Source: RMAppAttempt
|
||||||
APP_ADDED,
|
APP_ATTEMPT_ADDED,
|
||||||
APP_REMOVED,
|
APP_ATTEMPT_REMOVED,
|
||||||
|
|
||||||
// Source: ContainerAllocationExpirer
|
// Source: ContainerAllocationExpirer
|
||||||
CONTAINER_EXPIRED
|
CONTAINER_EXPIRED
|
||||||
|
|
|
@ -75,8 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
@ -591,7 +591,7 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
* user. This will accept a new app even if the user or queue is above
|
* user. This will accept a new app even if the user or queue is above
|
||||||
* configured limits, but the app will not be marked as runnable.
|
* configured limits, but the app will not be marked as runnable.
|
||||||
*/
|
*/
|
||||||
protected synchronized void addApplication(
|
protected synchronized void addApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
||||||
if (queueName == null || queueName.isEmpty()) {
|
if (queueName == null || queueName.isEmpty()) {
|
||||||
String message = "Reject application " + applicationAttemptId +
|
String message = "Reject application " + applicationAttemptId +
|
||||||
|
@ -674,7 +674,7 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void removeApplication(
|
private synchronized void removeApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState) {
|
RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
LOG.info("Application " + applicationAttemptId + " is done." +
|
LOG.info("Application " + applicationAttemptId + " is done." +
|
||||||
|
@ -1090,22 +1090,24 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
||||||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||||
break;
|
break;
|
||||||
case APP_ADDED:
|
case APP_ATTEMPT_ADDED:
|
||||||
if (!(event instanceof AppAddedSchedulerEvent)) {
|
if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||||
String queue = appAddedEvent.getQueue();
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
String queue = appAttemptAddedEvent.getQueue();
|
||||||
appAddedEvent.getUser());
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
|
queue, appAttemptAddedEvent.getUser());
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
if (!(event instanceof AppRemovedSchedulerEvent)) {
|
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||||
removeApplication(appRemovedEvent.getApplicationAttemptID(),
|
(AppAttemptRemovedSchedulerEvent) event;
|
||||||
appRemovedEvent.getFinalAttemptState());
|
removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||||
|
appAttemptRemovedEvent.getFinalAttemptState());
|
||||||
break;
|
break;
|
||||||
case CONTAINER_EXPIRED:
|
case CONTAINER_EXPIRED:
|
||||||
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -67,11 +66,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
@ -340,7 +348,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
return nodes.get(nodeId);
|
return nodes.get(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
|
private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
||||||
String user) {
|
String user) {
|
||||||
// TODO: Fix store
|
// TODO: Fix store
|
||||||
FiCaSchedulerApp schedulerApp =
|
FiCaSchedulerApp schedulerApp =
|
||||||
|
@ -355,7 +363,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
RMAppAttemptEventType.APP_ACCEPTED));
|
RMAppAttemptEventType.APP_ACCEPTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void doneApplication(
|
private synchronized void doneApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState)
|
RMAppAttemptState rmAppAttemptFinalState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -689,22 +697,25 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_ADDED:
|
case APP_ATTEMPT_ADDED:
|
||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||||
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
|
(AppAttemptAddedSchedulerEvent) event;
|
||||||
.getUser());
|
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||||
|
appAttemptAddedEvent.getUser());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_ATTEMPT_REMOVED:
|
||||||
{
|
{
|
||||||
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||||
|
(AppAttemptRemovedSchedulerEvent) event;
|
||||||
try {
|
try {
|
||||||
doneApplication(appRemovedEvent.getApplicationAttemptID(),
|
doneApplicationAttempt(
|
||||||
appRemovedEvent.getFinalAttemptState());
|
appAttemptRemovedEvent.getApplicationAttemptID(),
|
||||||
|
appAttemptRemovedEvent.getFinalAttemptState());
|
||||||
} catch(IOException ie) {
|
} catch(IOException ie) {
|
||||||
LOG.error("Unable to remove application "
|
LOG.error("Unable to remove application "
|
||||||
+ appRemovedEvent.getApplicationAttemptID(), ie);
|
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
@ -166,7 +166,7 @@ public class Application {
|
||||||
resourceManager.getClientRMService().submitApplication(request);
|
resourceManager.getClientRMService().submitApplication(request);
|
||||||
|
|
||||||
// Notify scheduler
|
// Notify scheduler
|
||||||
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
|
||||||
this.applicationAttemptId, this.queue, this.user);
|
this.applicationAttemptId, this.queue, this.user);
|
||||||
scheduler.handle(appAddedEvent1);
|
scheduler.handle(appAddedEvent1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -297,8 +297,8 @@ public class TestFifoScheduler {
|
||||||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||||
appId1, 1);
|
appId1, 1);
|
||||||
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
SchedulerEvent event1 =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
|
||||||
fs.handle(event1);
|
fs.handle(event1);
|
||||||
|
|
||||||
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||||
|
@ -388,15 +388,15 @@ public class TestFifoScheduler {
|
||||||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||||
appId1, 1);
|
appId1, 1);
|
||||||
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
SchedulerEvent event1 =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
|
||||||
fs.handle(event1);
|
fs.handle(event1);
|
||||||
|
|
||||||
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
|
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
|
||||||
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
|
||||||
appId2, 1);
|
appId2, 1);
|
||||||
SchedulerEvent event2 = new AppAddedSchedulerEvent(appAttemptId2, "queue",
|
SchedulerEvent event2 =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
|
||||||
fs.handle(event2);
|
fs.handle(event2);
|
||||||
|
|
||||||
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -159,7 +159,8 @@ public class TestResourceManager {
|
||||||
application.finishTask(t3);
|
application.finishTask(t3);
|
||||||
|
|
||||||
// Notify scheduler application is finished.
|
// Notify scheduler application is finished.
|
||||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(
|
||||||
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||||
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
|
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
|
@ -329,7 +329,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
// Check events
|
// Check events
|
||||||
verify(masterService).
|
verify(masterService).
|
||||||
registerAppAttempt(applicationAttempt.getAppAttemptId());
|
registerAppAttempt(applicationAttempt.getAppAttemptId());
|
||||||
verify(scheduler).handle(any(AppAddedSchedulerEvent.class));
|
verify(scheduler).handle(any(AppAttemptAddedSchedulerEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -555,8 +555,8 @@ public class TestCapacityScheduler {
|
||||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
appId, 1);
|
appId, 1);
|
||||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
|
SchedulerEvent event =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
|
||||||
cs.handle(event);
|
cs.handle(event);
|
||||||
|
|
||||||
// Verify the blacklist can be updated independent of requesting containers
|
// Verify the blacklist can be updated independent of requesting containers
|
||||||
|
|
|
@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
@ -348,7 +348,7 @@ public class TestLeafQueue {
|
||||||
a.submitApplication(app_0, user_0, B);
|
a.submitApplication(app_0, user_0, B);
|
||||||
|
|
||||||
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
|
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
|
||||||
AppRemovedSchedulerEvent event = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
|
||||||
appAttemptId_0, RMAppAttemptState.FAILED);
|
appAttemptId_0, RMAppAttemptState.FAILED);
|
||||||
cs.handle(event);
|
cs.handle(event);
|
||||||
|
|
||||||
|
@ -366,7 +366,7 @@ public class TestLeafQueue {
|
||||||
assertEquals(1, a.getMetrics().getAppsPending());
|
assertEquals(1, a.getMetrics().getAppsPending());
|
||||||
|
|
||||||
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
|
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
|
||||||
event = new AppRemovedSchedulerEvent(appAttemptId_0,
|
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
|
||||||
RMAppAttemptState.FINISHED);
|
RMAppAttemptState.FINISHED);
|
||||||
cs.handle(event);
|
cs.handle(event);
|
||||||
|
|
||||||
|
|
|
@ -19,10 +19,10 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -33,11 +33,11 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.xml.parsers.ParserConfigurationException;
|
import javax.xml.parsers.ParserConfigurationException;
|
||||||
|
|
||||||
|
@ -54,10 +54,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -79,9 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -256,7 +255,7 @@ public class TestFairScheduler {
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
||||||
String queueId, String userId, int numContainers, int priority) {
|
String queueId, String userId, int numContainers, int priority) {
|
||||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(id, queueId, userId);
|
scheduler.addApplicationAttempt(id, queueId, userId);
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||||
priority, numContainers, true);
|
priority, numContainers, true);
|
||||||
|
@ -619,8 +618,8 @@ public class TestFairScheduler {
|
||||||
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
||||||
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
||||||
|
|
||||||
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
|
AppAttemptAddedSchedulerEvent appAddedEvent =
|
||||||
appAttemptId, "default", "user1");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user1");
|
||||||
scheduler.handle(appAddedEvent);
|
scheduler.handle(appAddedEvent);
|
||||||
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||||
.getRunnableAppSchedulables().size());
|
.getRunnableAppSchedulables().size());
|
||||||
|
@ -641,8 +640,8 @@ public class TestFairScheduler {
|
||||||
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
||||||
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
||||||
|
|
||||||
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
|
AppAttemptAddedSchedulerEvent appAddedEvent2 =
|
||||||
appAttemptId, "default", "user2");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user2");
|
||||||
scheduler.handle(appAddedEvent2);
|
scheduler.handle(appAddedEvent2);
|
||||||
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
|
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||||
.getRunnableAppSchedulables().size());
|
.getRunnableAppSchedulables().size());
|
||||||
|
@ -661,8 +660,8 @@ public class TestFairScheduler {
|
||||||
|
|
||||||
// submit app with empty queue
|
// submit app with empty queue
|
||||||
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
||||||
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
|
AppAttemptAddedSchedulerEvent appAddedEvent =
|
||||||
appAttemptId, "", "user1");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "", "user1");
|
||||||
scheduler.handle(appAddedEvent);
|
scheduler.handle(appAddedEvent);
|
||||||
|
|
||||||
// submission rejected
|
// submission rejected
|
||||||
|
@ -787,11 +786,11 @@ public class TestFairScheduler {
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||||
scheduler.addApplication(id11, "root.queue1", "user1");
|
scheduler.addApplicationAttempt(id11, "root.queue1", "user1");
|
||||||
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
||||||
scheduler.addApplication(id21, "root.queue2", "user1");
|
scheduler.addApplicationAttempt(id21, "root.queue2", "user1");
|
||||||
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
||||||
scheduler.addApplication(id22, "root.queue2", "user1");
|
scheduler.addApplicationAttempt(id22, "root.queue2", "user1");
|
||||||
|
|
||||||
int minReqSize =
|
int minReqSize =
|
||||||
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
|
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
|
||||||
|
@ -833,8 +832,9 @@ public class TestFairScheduler {
|
||||||
public void testAppAdditionAndRemoval() throws Exception {
|
public void testAppAdditionAndRemoval() throws Exception {
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
AppAttemptAddedSchedulerEvent appAddedEvent1 =
|
||||||
createAppAttemptId(1, 1), "default", "user1");
|
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), "default",
|
||||||
|
"user1");
|
||||||
scheduler.handle(appAddedEvent1);
|
scheduler.handle(appAddedEvent1);
|
||||||
|
|
||||||
// Scheduler should have two queues (the default and the one created for user1)
|
// Scheduler should have two queues (the default and the one created for user1)
|
||||||
|
@ -844,7 +844,7 @@ public class TestFairScheduler {
|
||||||
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
||||||
.getRunnableAppSchedulables().size());
|
.getRunnableAppSchedulables().size());
|
||||||
|
|
||||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
||||||
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
|
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
// Now remove app
|
// Now remove app
|
||||||
|
@ -1526,7 +1526,7 @@ public class TestFairScheduler {
|
||||||
scheduler.handle(nodeEvent2);
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(appId, "queue1", "user1");
|
scheduler.addApplicationAttempt(appId, "queue1", "user1");
|
||||||
|
|
||||||
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||||
// a different rack
|
// a different rack
|
||||||
|
@ -1764,7 +1764,7 @@ public class TestFairScheduler {
|
||||||
|
|
||||||
ApplicationAttemptId attId =
|
ApplicationAttemptId attId =
|
||||||
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
||||||
scheduler.addApplication(attId, queue, user);
|
scheduler.addApplicationAttempt(attId, queue, user);
|
||||||
|
|
||||||
numTries = 0;
|
numTries = 0;
|
||||||
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
||||||
|
@ -2238,8 +2238,8 @@ public class TestFairScheduler {
|
||||||
verifyQueueNumRunnable("queue1", 2, 1);
|
verifyQueueNumRunnable("queue1", 2, 1);
|
||||||
|
|
||||||
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
|
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
|
||||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||||
attId1, RMAppAttemptState.FINISHED);
|
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED);
|
||||||
scheduler.handle(appRemovedEvent1);
|
scheduler.handle(appRemovedEvent1);
|
||||||
verifyAppRunnable(attId2, true);
|
verifyAppRunnable(attId2, true);
|
||||||
verifyQueueNumRunnable("queue2", 1, 0);
|
verifyQueueNumRunnable("queue2", 1, 0);
|
||||||
|
@ -2302,8 +2302,8 @@ public class TestFairScheduler {
|
||||||
|
|
||||||
// Even though the app was removed from sub3, the app from sub2 gets to go
|
// Even though the app was removed from sub3, the app from sub2 gets to go
|
||||||
// because it came in first
|
// because it came in first
|
||||||
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||||
attId2, RMAppAttemptState.FINISHED);
|
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED);
|
||||||
scheduler.handle(appRemovedEvent1);
|
scheduler.handle(appRemovedEvent1);
|
||||||
verifyAppRunnable(attId4, true);
|
verifyAppRunnable(attId4, true);
|
||||||
verifyQueueNumRunnable("queue1.sub2", 2, 0);
|
verifyQueueNumRunnable("queue1.sub2", 2, 0);
|
||||||
|
@ -2311,16 +2311,16 @@ public class TestFairScheduler {
|
||||||
verifyQueueNumRunnable("queue1.sub3", 0, 1);
|
verifyQueueNumRunnable("queue1.sub3", 0, 1);
|
||||||
|
|
||||||
// Now test removal of a non-runnable app
|
// Now test removal of a non-runnable app
|
||||||
AppRemovedSchedulerEvent appRemovedEvent2 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
|
||||||
attId5, RMAppAttemptState.KILLED);
|
new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED);
|
||||||
scheduler.handle(appRemovedEvent2);
|
scheduler.handle(appRemovedEvent2);
|
||||||
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
|
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
|
||||||
.get("user1").size());
|
.get("user1").size());
|
||||||
// verify app gone in queue accounting
|
// verify app gone in queue accounting
|
||||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||||
// verify it doesn't become runnable when there would be space for it
|
// verify it doesn't become runnable when there would be space for it
|
||||||
AppRemovedSchedulerEvent appRemovedEvent3 = new AppRemovedSchedulerEvent(
|
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
|
||||||
attId4, RMAppAttemptState.FINISHED);
|
new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED);
|
||||||
scheduler.handle(appRemovedEvent3);
|
scheduler.handle(appRemovedEvent3);
|
||||||
verifyQueueNumRunnable("queue1.sub2", 1, 0);
|
verifyQueueNumRunnable("queue1.sub2", 1, 0);
|
||||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||||
|
@ -2356,7 +2356,7 @@ public class TestFairScheduler {
|
||||||
// send application request
|
// send application request
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
fs.addApplication(appAttemptId, "queue11", "user11");
|
fs.addApplicationAttempt(appAttemptId, "queue11", "user11");
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
ResourceRequest request =
|
ResourceRequest request =
|
||||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||||
|
|
|
@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
@ -150,13 +150,13 @@ public class TestFifoScheduler {
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
appId, 1);
|
appId, 1);
|
||||||
|
|
||||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "queue",
|
SchedulerEvent event =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||||
schedular.handle(event);
|
schedular.handle(event);
|
||||||
|
|
||||||
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
|
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
|
||||||
|
|
||||||
event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
|
event = new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||||
schedular.handle(event);
|
schedular.handle(event);
|
||||||
|
|
||||||
int afterAppsSubmitted = metrics.getAppsSubmitted();
|
int afterAppsSubmitted = metrics.getAppsSubmitted();
|
||||||
|
@ -188,8 +188,8 @@ public class TestFifoScheduler {
|
||||||
int _appAttemptId = 1;
|
int _appAttemptId = 1;
|
||||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||||
_appAttemptId);
|
_appAttemptId);
|
||||||
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
|
AppAttemptAddedSchedulerEvent appEvent1 =
|
||||||
"queue1", "user1");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1");
|
||||||
scheduler.handle(appEvent1);
|
scheduler.handle(appEvent1);
|
||||||
|
|
||||||
int memory = 64;
|
int memory = 64;
|
||||||
|
@ -274,8 +274,8 @@ public class TestFifoScheduler {
|
||||||
int _appAttemptId = 1;
|
int _appAttemptId = 1;
|
||||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||||
_appAttemptId);
|
_appAttemptId);
|
||||||
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
|
AppAttemptAddedSchedulerEvent appEvent1 =
|
||||||
"queue1", "user1");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1");
|
||||||
scheduler.handle(appEvent1);
|
scheduler.handle(appEvent1);
|
||||||
|
|
||||||
int memory = 1024;
|
int memory = 1024;
|
||||||
|
@ -541,8 +541,8 @@ public class TestFifoScheduler {
|
||||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
appId, 1);
|
appId, 1);
|
||||||
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
|
SchedulerEvent event =
|
||||||
"user");
|
new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
|
||||||
fs.handle(event);
|
fs.handle(event);
|
||||||
|
|
||||||
// Verify the blacklist can be updated independent of requesting containers
|
// Verify the blacklist can be updated independent of requesting containers
|
||||||
|
|
Loading…
Reference in New Issue