YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-04-08 13:40:53 +08:00
parent 72f4b9cd68
commit fc05b0e70e
6 changed files with 539 additions and 57 deletions

View File

@ -63,14 +63,14 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
String type = "app";
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
recordActivity(activitiesManager, node, application.getQueueName(),
if (activitiesManager.shouldRecordThisNode(nodeId)) {
recordActivity(activitiesManager, nodeId, application.getQueueName(),
application.getApplicationId().toString(), priority,
ActivityState.REJECTED, diagnostic, type);
ActivityState.REJECTED, diagnostic, "app");
}
finishSkippedAppAllocationRecording(activitiesManager,
application.getApplicationId(), ActivityState.REJECTED, diagnostic);
@ -85,18 +85,19 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) {
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node,
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app";
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node,
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
@ -122,20 +123,21 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, RMContainer updatedContainer,
ActivityState activityState) {
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node,
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(),
updatedContainer.getContainer().toString(),
updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type);
type = "app";
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node,
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED,
@ -161,11 +163,12 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, FiCaSchedulerNode node,
long currentTime,
SchedulerApplicationAttempt application) {
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
activitiesManager
.startAppAllocationRecording(node.getNodeID(), currentTime,
.startAppAllocationRecording(nodeId, currentTime,
application);
}
@ -211,11 +214,12 @@ public class ActivitiesLogger {
public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) {
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
recordActivity(activitiesManager, node, parentQueueName, queueName,
if (activitiesManager.shouldRecordThisNode(nodeId)) {
recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
null, state, diagnostic, null);
}
}
@ -243,11 +247,12 @@ public class ActivitiesLogger {
public static void finishAllocatedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
ContainerId containerId, AllocationState containerState) {
if (node == null || activitiesManager == null) {
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
activitiesManager.updateAllocationFinalState(node.getNodeID(),
if (activitiesManager.shouldRecordThisNode(nodeId)) {
activitiesManager.updateAllocationFinalState(nodeId,
containerId, containerState);
}
}
@ -277,12 +282,16 @@ public class ActivitiesLogger {
// Add queue, application or container activity into specific node allocation.
private static void recordActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentName, String childName,
NodeId nodeId, String parentName, String childName,
Priority priority, ActivityState state, String diagnostic, String type) {
activitiesManager.addSchedulingActivityForNode(node, parentName,
activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
childName, priority != null ? priority.toString() : null, state,
diagnostic, type);
}
private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
SchedulerNode node) {
return activitiesManager == null ? null :
activitiesManager.getRecordingNodeId(node);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
@ -46,8 +47,13 @@ import java.util.ArrayList;
public class ActivitiesManager extends AbstractService {
private static final Logger LOG =
LoggerFactory.getLogger(ActivitiesManager.class);
private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
// An empty node ID, we use this variable as a placeholder
// in the activity records when recording multiple nodes assignments.
public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0);
private ThreadLocal<Map<NodeId, List<NodeAllocation>>>
recordingNodesAllocation;
@VisibleForTesting
ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
private Set<NodeId> activeRecordedNodes;
private ConcurrentMap<ApplicationId, Long>
recordingAppActivitiesUntilSpecifiedTime;
@ -63,7 +69,7 @@ public class ActivitiesManager extends AbstractService {
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
recordingNodesAllocation = new ConcurrentHashMap<>();
recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
completedNodeAllocations = new ConcurrentHashMap<>();
appsAllocation = new ConcurrentHashMap<>();
completedAppAllocations = new ConcurrentHashMap<>();
@ -173,9 +179,11 @@ public class ActivitiesManager extends AbstractService {
if (recordNextAvailableNode) {
recordNextNodeUpdateActivities(nodeID.toString());
}
if (activeRecordedNodes.contains(nodeID)) {
// Removing from activeRecordedNodes immediately is to ensure that
// activities will be recorded just once in multiple threads.
if (activeRecordedNodes.remove(nodeID)) {
List<NodeAllocation> nodeAllocation = new ArrayList<>();
recordingNodesAllocation.put(nodeID, nodeAllocation);
recordingNodesAllocation.get().put(nodeID, nodeAllocation);
}
}
@ -199,12 +207,11 @@ public class ActivitiesManager extends AbstractService {
}
// Add queue, application or container activity into specific node allocation.
void addSchedulingActivityForNode(SchedulerNode node, String parentName,
void addSchedulingActivityForNode(NodeId nodeId, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
if (shouldRecordThisNode(node.getNodeID())) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(
node.getNodeID());
if (shouldRecordThisNode(nodeId)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId);
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
}
@ -262,7 +269,7 @@ public class ActivitiesManager extends AbstractService {
}
void finishNodeUpdateRecording(NodeId nodeID) {
List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
List<NodeAllocation> value = recordingNodesAllocation.get().get(nodeID);
long timeStamp = SystemClock.getInstance().getTime();
if (value != null) {
@ -278,9 +285,8 @@ public class ActivitiesManager extends AbstractService {
}
if (shouldRecordThisNode(nodeID)) {
recordingNodesAllocation.remove(nodeID);
recordingNodesAllocation.get().remove(nodeID);
completedNodeAllocations.put(nodeID, value);
stopRecordNodeUpdateActivities(nodeID);
}
}
}
@ -291,12 +297,15 @@ public class ActivitiesManager extends AbstractService {
}
boolean shouldRecordThisNode(NodeId nodeID) {
return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
return isRecordingMultiNodes() || recordingNodesAllocation.get()
.containsKey(nodeID);
}
private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
NodeId recordingKey =
isRecordingMultiNodes() ? EMPTY_NODE_ID : nodeID;
List<NodeAllocation> nodeAllocations =
recordingNodesAllocation.get().get(recordingKey);
NodeAllocation nodeAllocation;
// When this node has already stored allocation activities, get the
// last allocation for this node.
@ -323,11 +332,29 @@ public class ActivitiesManager extends AbstractService {
return nodeAllocation;
}
private void stopRecordNodeUpdateActivities(NodeId nodeId) {
activeRecordedNodes.remove(nodeId);
}
private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
}
public boolean isRecordingMultiNodes() {
return recordingNodesAllocation.get().containsKey(EMPTY_NODE_ID);
}
/**
* Get recording node id:
* 1. node id of the input node if it is not null.
* 2. EMPTY_NODE_ID if input node is null and activities manager is
* recording multi-nodes.
* 3. null otherwise.
* @param node - input node
* @return recording nodeId
*/
public NodeId getRecordingNodeId(SchedulerNode node) {
if (node != null) {
return node.getNodeID();
} else if (isRecordingMultiNodes()) {
return ActivitiesManager.EMPTY_NODE_ID;
}
return null;
}
}

View File

@ -1297,17 +1297,12 @@ public class CapacityScheduler extends
if (!scheduleAsynchronously) {
writeLock.lock();
try {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
rmNode.getNodeID());
// reset allocation and reservation stats before we start doing any
// work
updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
CSAssignment.NULL_ASSIGNMENT);
allocateContainersToNode(rmNode.getNodeID(), true);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
rmNode.getNodeID());
} finally {
writeLock.unlock();
}
@ -1706,10 +1701,18 @@ public class CapacityScheduler extends
// nodes.
CSAssignment assignment;
if (!multiNodePlacementEnabled) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
node.getNodeID());
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
node.getNodeID());
} else{
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
assignment = allocateContainersOnMultiNodes(candidates);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
}
if (assignment != null && assignment.getAssignmentInformation() != null

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
/**
* Test class for {@link ActivitiesManager}.
*/
public class TestActivitiesManager {
private final static int NUM_NODES = 5;
private final static int NUM_APPS = 5;
private final static int NUM_THREADS = 5;
private RMContext rmContext;
private TestingActivitiesManager activitiesManager;
private List<SchedulerApplicationAttempt> apps;
private List<SchedulerNode> nodes;
private ThreadPoolExecutor threadPoolExecutor;
@Before
public void setup() {
rmContext = Mockito.mock(RMContext.class);
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
Mockito.when(scheduler.getMinimumResourceCapability())
.thenReturn(Resources.none());
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
Mockito.doReturn(rmApps).when(rmContext).getRMApps();
apps = new ArrayList<>();
for (int i = 0; i < NUM_APPS; i++) {
ApplicationAttemptId appAttemptId =
TestUtils.getMockApplicationAttemptId(i, 0);
RMApp mockApp = Mockito.mock(RMApp.class);
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp)
.getApplicationId();
rmApps.put(appAttemptId.getApplicationId(), mockApp);
FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
mock(ActiveUsersManager.class), rmContext);
apps.add(app);
}
nodes = new ArrayList<>();
for (int i = 0; i < NUM_NODES; i++) {
nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240));
}
activitiesManager = new TestingActivitiesManager(rmContext);
threadPoolExecutor =
new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
/**
* Test recording activities belong to different nodes in multiple threads,
* these threads can run without interference and one activity
* should be recorded by every thread.
*/
@Test
public void testRecordingDifferentNodeActivitiesInMultiThreads()
throws Exception {
Random rand = new Random();
List<Future<Void>> futures = new ArrayList<>();
for (SchedulerNode node : nodes) {
Callable<Void> task = () -> {
SchedulerApplicationAttempt randomApp =
apps.get(rand.nextInt(NUM_APPS));
// start recording activities for random node
activitiesManager.recordNextNodeUpdateActivities(
node.getNodeID().toString());
// generate node/app activities
ActivitiesLogger.NODE
.startNodeUpdateRecording(activitiesManager, node.getNodeID());
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
randomApp, Priority.newInstance(0),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
ActivityState.REJECTED);
ActivitiesLogger.NODE
.finishNodeUpdateRecording(activitiesManager, node.getNodeID());
return null;
};
futures.add(threadPoolExecutor.submit(task));
}
for (Future<Void> future : futures) {
future.get();
}
// Check activities for all nodes should be recorded and every node should
// have only one allocation information.
Assert.assertEquals(NUM_NODES,
activitiesManager.historyNodeAllocations.size());
for (List<List<NodeAllocation>> nodeAllocationsForThisNode :
activitiesManager.historyNodeAllocations.values()) {
Assert.assertEquals(1, nodeAllocationsForThisNode.size());
Assert.assertEquals(1, nodeAllocationsForThisNode.get(0).size());
}
}
/**
* Test recording activities for multi-nodes assignment in multiple threads,
* only one activity info should be recorded by one of these threads.
*/
@Test
public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads()
throws Exception {
Random rand = new Random();
// start recording activities for multi-nodes
activitiesManager.recordNextNodeUpdateActivities(
ActivitiesManager.EMPTY_NODE_ID.toString());
List<Future<Void>> futures = new ArrayList<>();
// generate node/app activities
for (SchedulerNode node : nodes) {
Callable<Void> task = () -> {
SchedulerApplicationAttempt randomApp =
apps.get(rand.nextInt(NUM_APPS));
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
randomApp, Priority.newInstance(0),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
ActivityState.REJECTED);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
return null;
};
futures.add(threadPoolExecutor.submit(task));
}
for (Future<Void> future : futures) {
future.get();
}
// Check activities for multi-nodes should be recorded only once
Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
}
/**
* Testing activities manager which can record all history information about
* node allocations.
*/
public class TestingActivitiesManager extends ActivitiesManager {
private Map<NodeId, List<List<NodeAllocation>>> historyNodeAllocations =
new ConcurrentHashMap<>();
public TestingActivitiesManager(RMContext rmContext) {
super(rmContext);
super.completedNodeAllocations = Mockito.spy(new ConcurrentHashMap<>());
Mockito.doAnswer((invocationOnMock) -> {
NodeId nodeId = (NodeId) invocationOnMock.getArguments()[0];
List<NodeAllocation> nodeAllocations =
(List<NodeAllocation>) invocationOnMock.getArguments()[1];
List<List<NodeAllocation>> historyAllocationsForThisNode =
historyNodeAllocations.get(nodeId);
if (historyAllocationsForThisNode == null) {
historyAllocationsForThisNode = new ArrayList<>();
historyNodeAllocations.put(nodeId, historyAllocationsForThisNode);
}
historyAllocationsForThisNode.add(nodeAllocations);
return null;
}).when(completedNodeAllocations).put(any(NodeId.class),
any(List.class));
}
}
}

View File

@ -95,16 +95,12 @@ public class TestRMWebServicesSchedulerActivities
response.getType().toString());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 11);
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
if (i != allocations.length() - 1) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"finalAllocationState", "ALLOCATED");
verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
}
}
// Collection logic of scheduler activities changed after YARN-9313,
// only one allocation should be recorded for all scenarios.
verifyNumberOfAllocations(json, 1);
verifyStateOfAllocations(json.getJSONObject("allocations"),
"finalAllocationState", "ALLOCATED");
verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1");
}
finally {
rm.stop();

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.MediaType;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
/**
* Tests for scheduler/app activities when multi-nodes enabled.
*/
public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
extends JerseyTestBase {
private static MockRM rm;
private static CapacitySchedulerConfiguration csConf;
private static YarnConfiguration conf;
public TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
private static class WebServletModule extends ServletModule {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
csConf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// enable multi-nodes placement
conf.setBoolean(
CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
}
}
private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
// Define top-level queues
config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
config.setCapacity(queueA, 10.5f);
config.setMaximumCapacity(queueA, 50);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
config.setCapacity(queueB, 89.5f);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
@Test (timeout=30000)
public void testAssignContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 2 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 1), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 1), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
1)), null);
//Trigger recording for multi-nodes without params
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
//Trigger scheduling for this app
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
//Check scheduler activities, it should contain one allocation and
// final allocation state is ALLOCATED
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations,
"finalAllocationState", "ALLOCATED");
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testSchedulingWithoutPendingRequests()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 8 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
//Trigger recording for multi-nodes without params
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
//Trigger scheduling for this app
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
//Check scheduler activities, it should contain one allocation and
// final allocation state is SKIPPED
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations,
"finalAllocationState", "SKIPPED");
} finally {
rm.stop();
}
}
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", 0, realValue);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong in: " + object,
((JSONArray) object).length(), realValue);
}
}
}
private void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String realState) throws Exception {
assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
realState);
}
}