YARN-5486. Update OpportunisticContainerAllocatorAMService::allocate method to handle OPPORTUNISTIC container requests. (Konstantinos Karanasos via asuresh)
(cherry picked from commit 10be45986c
)
(cherry picked from commit e3baa0988b4dfb592afe426ffeba3c7091b271c5)
This commit is contained in:
parent
937f739804
commit
f9ea753414
|
@ -0,0 +1,398 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that tests the allocation of OPPORTUNISTIC containers through the
|
||||||
|
* centralized ResourceManager.
|
||||||
|
*/
|
||||||
|
public class TestOpportunisticContainerAllocation {
|
||||||
|
private static Configuration conf = null;
|
||||||
|
private static MiniYARNCluster yarnCluster = null;
|
||||||
|
private static YarnClient yarnClient = null;
|
||||||
|
private static List<NodeReport> nodeReports = null;
|
||||||
|
private static ApplicationAttemptId attemptId = null;
|
||||||
|
private static int nodeCount = 3;
|
||||||
|
|
||||||
|
private static final int ROLLING_INTERVAL_SEC = 13;
|
||||||
|
private static final long AM_EXPIRE_MS = 4000;
|
||||||
|
|
||||||
|
private static Resource capability;
|
||||||
|
private static Priority priority;
|
||||||
|
private static Priority priority2;
|
||||||
|
private static String node;
|
||||||
|
private static String rack;
|
||||||
|
private static String[] nodes;
|
||||||
|
private static String[] racks;
|
||||||
|
private final static int DEFAULT_ITERATION = 3;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
// start minicluster
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||||
|
ROLLING_INTERVAL_SEC);
|
||||||
|
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
|
||||||
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||||
|
// set the minimum allocation so that resource decrease can go under 1024
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||||
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
||||||
|
yarnCluster =
|
||||||
|
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
|
|
||||||
|
// start rm client
|
||||||
|
yarnClient = YarnClient.createYarnClient();
|
||||||
|
yarnClient.init(conf);
|
||||||
|
yarnClient.start();
|
||||||
|
|
||||||
|
// get node info
|
||||||
|
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
|
||||||
|
|
||||||
|
priority = Priority.newInstance(1);
|
||||||
|
priority2 = Priority.newInstance(2);
|
||||||
|
capability = Resource.newInstance(1024, 1);
|
||||||
|
|
||||||
|
node = nodeReports.get(0).getNodeId().getHost();
|
||||||
|
rack = nodeReports.get(0).getRackName();
|
||||||
|
nodes = new String[]{node};
|
||||||
|
racks = new String[]{rack};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startApp() throws Exception {
|
||||||
|
// submit new app
|
||||||
|
ApplicationSubmissionContext appContext =
|
||||||
|
yarnClient.createApplication().getApplicationSubmissionContext();
|
||||||
|
ApplicationId appId = appContext.getApplicationId();
|
||||||
|
// set the application name
|
||||||
|
appContext.setApplicationName("Test");
|
||||||
|
// Set the priority for the application master
|
||||||
|
Priority pri = Records.newRecord(Priority.class);
|
||||||
|
pri.setPriority(0);
|
||||||
|
appContext.setPriority(pri);
|
||||||
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
|
appContext.setQueue("default");
|
||||||
|
// Set up the container launch context for the application master
|
||||||
|
ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
|
||||||
|
Collections.<String, LocalResource>emptyMap(),
|
||||||
|
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
||||||
|
new HashMap<String, ByteBuffer>(), null,
|
||||||
|
new HashMap<ApplicationAccessType, String>());
|
||||||
|
appContext.setAMContainerSpec(amContainer);
|
||||||
|
appContext.setResource(Resource.newInstance(1024, 1));
|
||||||
|
// Create the request to send to the applications manager
|
||||||
|
SubmitApplicationRequest appRequest =
|
||||||
|
Records.newRecord(SubmitApplicationRequest.class);
|
||||||
|
appRequest.setApplicationSubmissionContext(appContext);
|
||||||
|
// Submit the application to the applications manager
|
||||||
|
yarnClient.submitApplication(appContext);
|
||||||
|
|
||||||
|
// wait for app to start
|
||||||
|
RMAppAttempt appAttempt = null;
|
||||||
|
while (true) {
|
||||||
|
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||||
|
if (appReport.getYarnApplicationState() ==
|
||||||
|
YarnApplicationState.ACCEPTED) {
|
||||||
|
attemptId = appReport.getCurrentApplicationAttemptId();
|
||||||
|
appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
|
||||||
|
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
while (true) {
|
||||||
|
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
||||||
|
// of testing.
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
|
|
||||||
|
// emulate RM setup of AMRM token in credentials by adding the token
|
||||||
|
// *before* setting the token service
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||||
|
appAttempt.getAMRMToken()
|
||||||
|
.setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cancelApp() throws YarnException, IOException {
|
||||||
|
yarnClient.killApplication(attemptId.getApplicationId());
|
||||||
|
attemptId = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
if (yarnClient != null &&
|
||||||
|
yarnClient.getServiceState() == Service.STATE.STARTED) {
|
||||||
|
yarnClient.stop();
|
||||||
|
}
|
||||||
|
if (yarnCluster != null &&
|
||||||
|
yarnCluster.getServiceState() == Service.STATE.STARTED) {
|
||||||
|
yarnCluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAMRMClient() throws YarnException, IOException {
|
||||||
|
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
|
||||||
|
|
||||||
|
//setting an instance NMTokenCache
|
||||||
|
amClient.setNMTokenCache(new NMTokenCache());
|
||||||
|
//asserting we are not using the singleton instance cache
|
||||||
|
Assert.assertNotSame(NMTokenCache.getSingleton(),
|
||||||
|
amClient.getNMTokenCache());
|
||||||
|
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
|
||||||
|
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
|
||||||
|
|
||||||
|
amClient
|
||||||
|
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
|
||||||
|
null);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (amClient != null &&
|
||||||
|
amClient.getServiceState() == Service.STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAllocation(
|
||||||
|
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
// setup container request
|
||||||
|
|
||||||
|
assertEquals(0, amClient.ask.size());
|
||||||
|
assertEquals(0, amClient.release.size());
|
||||||
|
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||||
|
true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(
|
||||||
|
ExecutionType.OPPORTUNISTIC, true)));
|
||||||
|
amClient.addContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||||
|
true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(
|
||||||
|
ExecutionType.OPPORTUNISTIC, true)));
|
||||||
|
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||||
|
true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(
|
||||||
|
ExecutionType.OPPORTUNISTIC, true)));
|
||||||
|
|
||||||
|
int containersRequestedNode = amClient.getTable(0).get(priority,
|
||||||
|
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||||
|
.getNumContainers();
|
||||||
|
int containersRequestedRack = amClient.getTable(0).get(priority,
|
||||||
|
rack, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||||
|
.getNumContainers();
|
||||||
|
int containersRequestedAny = amClient.getTable(0).get(priority,
|
||||||
|
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||||
|
.remoteRequest.getNumContainers();
|
||||||
|
int oppContainersRequestedAny =
|
||||||
|
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||||
|
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||||
|
.getNumContainers();
|
||||||
|
|
||||||
|
assertEquals(2, containersRequestedNode);
|
||||||
|
assertEquals(2, containersRequestedRack);
|
||||||
|
assertEquals(2, containersRequestedAny);
|
||||||
|
assertEquals(1, oppContainersRequestedAny);
|
||||||
|
|
||||||
|
assertEquals(4, amClient.ask.size());
|
||||||
|
assertEquals(0, amClient.release.size());
|
||||||
|
|
||||||
|
// RM should allocate container within 2 calls to allocate()
|
||||||
|
int allocatedContainerCount = 0;
|
||||||
|
int allocatedOpportContainerCount = 0;
|
||||||
|
int iterationsLeft = 10;
|
||||||
|
Set<ContainerId> releases = new TreeSet<>();
|
||||||
|
|
||||||
|
amClient.getNMTokenCache().clearCache();
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
amClient.getNMTokenCache().numberOfTokensInCache());
|
||||||
|
HashMap<String, Token> receivedNMTokens = new HashMap<>();
|
||||||
|
|
||||||
|
while (allocatedContainerCount <
|
||||||
|
containersRequestedAny + oppContainersRequestedAny
|
||||||
|
&& iterationsLeft-- > 0) {
|
||||||
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
|
assertEquals(0, amClient.ask.size());
|
||||||
|
assertEquals(0, amClient.release.size());
|
||||||
|
|
||||||
|
allocatedContainerCount += allocResponse.getAllocatedContainers()
|
||||||
|
.size();
|
||||||
|
for (Container container : allocResponse.getAllocatedContainers()) {
|
||||||
|
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||||
|
allocatedOpportContainerCount++;
|
||||||
|
}
|
||||||
|
ContainerId rejectContainerId = container.getId();
|
||||||
|
releases.add(rejectContainerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (NMToken token : allocResponse.getNMTokens()) {
|
||||||
|
String nodeID = token.getNodeId().toString();
|
||||||
|
receivedNMTokens.put(nodeID, token.getToken());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allocatedContainerCount < containersRequestedAny) {
|
||||||
|
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||||
|
sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(allocatedContainerCount,
|
||||||
|
containersRequestedAny + oppContainersRequestedAny);
|
||||||
|
assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
|
||||||
|
for (ContainerId rejectContainerId : releases) {
|
||||||
|
amClient.releaseAssignedContainer(rejectContainerId);
|
||||||
|
}
|
||||||
|
assertEquals(3, amClient.release.size());
|
||||||
|
assertEquals(0, amClient.ask.size());
|
||||||
|
|
||||||
|
// need to tell the AMRMClient that we don't need these resources anymore
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||||
|
amClient.removeContainerRequest(
|
||||||
|
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
|
||||||
|
true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(
|
||||||
|
ExecutionType.OPPORTUNISTIC, true)));
|
||||||
|
assertEquals(4, amClient.ask.size());
|
||||||
|
|
||||||
|
iterationsLeft = 3;
|
||||||
|
// do a few iterations to ensure RM is not going to send new containers
|
||||||
|
while (iterationsLeft-- > 0) {
|
||||||
|
// inform RM of rejection
|
||||||
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
|
// RM did not send new containers because AM does not need any
|
||||||
|
assertEquals(0, allocResponse.getAllocatedContainers().size());
|
||||||
|
if (allocResponse.getCompletedContainersStatuses().size() > 0) {
|
||||||
|
for (ContainerStatus cStatus : allocResponse
|
||||||
|
.getCompletedContainersStatuses()) {
|
||||||
|
if (releases.contains(cStatus.getContainerId())) {
|
||||||
|
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
|
||||||
|
assertEquals(-100, cStatus.getExitStatus());
|
||||||
|
releases.remove(cStatus.getContainerId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (iterationsLeft > 0) {
|
||||||
|
// sleep to make sure NM's heartbeat
|
||||||
|
sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(0, amClient.ask.size());
|
||||||
|
assertEquals(0, amClient.release.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleep(int sleepTime) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -144,15 +145,6 @@ public class OpportunisticContainerAllocator {
|
||||||
this.containerIdCounter.set(containerIdStart);
|
this.containerIdCounter.set(containerIdStart);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the underlying Atomic Long. To be used when implementation needs to
|
|
||||||
* share the underlying AtomicLong of an existing counter.
|
|
||||||
* @param counter AtomicLong
|
|
||||||
*/
|
|
||||||
public void setContainerIdCounter(AtomicLong counter) {
|
|
||||||
this.containerIdCounter = counter;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates a new long value. Default implementation increments the
|
* Generates a new long value. Default implementation increments the
|
||||||
* underlying AtomicLong. Sub classes are encouraged to over-ride this
|
* underlying AtomicLong. Sub classes are encouraged to over-ride this
|
||||||
|
@ -213,6 +205,10 @@ public class OpportunisticContainerAllocator {
|
||||||
PartitionedResourceRequests partitionedAsks =
|
PartitionedResourceRequests partitionedAsks =
|
||||||
partitionAskList(request.getAskList());
|
partitionAskList(request.getAskList());
|
||||||
|
|
||||||
|
if (partitionedAsks.getOpportunistic().isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
List<ContainerId> releasedContainers = request.getReleaseList();
|
List<ContainerId> releasedContainers = request.getReleaseList();
|
||||||
int numReleasedContainers = releasedContainers.size();
|
int numReleasedContainers = releasedContainers.size();
|
||||||
if (numReleasedContainers > 0) {
|
if (numReleasedContainers > 0) {
|
||||||
|
@ -236,8 +232,8 @@ public class OpportunisticContainerAllocator {
|
||||||
appContext.getOutstandingOpReqs().descendingKeySet()) {
|
appContext.getOutstandingOpReqs().descendingKeySet()) {
|
||||||
// Allocated containers :
|
// Allocated containers :
|
||||||
// Key = Requested Capability,
|
// Key = Requested Capability,
|
||||||
// Value = List of Containers of given Cap (The actual container size
|
// Value = List of Containers of given cap (the actual container size
|
||||||
// might be different than what is requested.. which is why
|
// might be different than what is requested, which is why
|
||||||
// we need the requested capability (key) to match against
|
// we need the requested capability (key) to match against
|
||||||
// the outstanding reqs)
|
// the outstanding reqs)
|
||||||
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
|
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
|
||||||
|
@ -290,6 +286,10 @@ public class OpportunisticContainerAllocator {
|
||||||
}
|
}
|
||||||
nodesForScheduling.add(nodeEntry.getValue());
|
nodesForScheduling.add(nodeEntry.getValue());
|
||||||
}
|
}
|
||||||
|
if (nodesForScheduling.isEmpty()) {
|
||||||
|
LOG.warn("No nodes available for allocating opportunistic containers.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
int numAllocated = 0;
|
int numAllocated = 0;
|
||||||
int nextNodeToSchedule = 0;
|
int nextNodeToSchedule = 0;
|
||||||
for (int numCont = 0; numCont < toAllocate; numCont++) {
|
for (int numCont = 0; numCont < toAllocate; numCont++) {
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.scheduler;
|
package org.apache.hadoop.yarn.server.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -28,9 +30,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -56,15 +60,13 @@ public class OpportunisticContainerContext {
|
||||||
private ContainerIdGenerator containerIdGenerator =
|
private ContainerIdGenerator containerIdGenerator =
|
||||||
new ContainerIdGenerator();
|
new ContainerIdGenerator();
|
||||||
|
|
||||||
private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
|
private volatile List<NodeId> nodeList = new LinkedList<>();
|
||||||
|
private final Map<String, NodeId> nodeMap = new LinkedHashMap<>();
|
||||||
|
|
||||||
// Mapping of NodeId to NodeTokens. Populated either from RM response or
|
|
||||||
// generated locally if required.
|
|
||||||
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
|
|
||||||
private final Set<String> blacklist = new HashSet<>();
|
private final Set<String> blacklist = new HashSet<>();
|
||||||
|
|
||||||
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
|
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
|
||||||
// Resource Name (Host/rack/any) and capability. This mapping is required
|
// Resource Name (host/rack/any) and capability. This mapping is required
|
||||||
// to match a received Container to an outstanding OPPORTUNISTIC
|
// to match a received Container to an outstanding OPPORTUNISTIC
|
||||||
// ResourceRequest (ask).
|
// ResourceRequest (ask).
|
||||||
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
|
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
|
||||||
|
@ -74,7 +76,7 @@ public class OpportunisticContainerContext {
|
||||||
return containersAllocated;
|
return containersAllocated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OpportunisticContainerAllocator.AllocationParams getAppParams() {
|
public AllocationParams getAppParams() {
|
||||||
return appParams;
|
return appParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,11 +90,29 @@ public class OpportunisticContainerContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, NodeId> getNodeMap() {
|
public Map<String, NodeId> getNodeMap() {
|
||||||
return nodeMap;
|
return Collections.unmodifiableMap(nodeMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<NodeId, NMToken> getNodeTokens() {
|
public synchronized void updateNodeList(List<NodeId> newNodeList) {
|
||||||
return nodeTokens;
|
// This is an optimization for centralized placement. The
|
||||||
|
// OppContainerAllocatorAMService has a cached list of nodes which it sets
|
||||||
|
// here. The nodeMap needs to be updated only if the backing node list is
|
||||||
|
// modified.
|
||||||
|
if (newNodeList != nodeList) {
|
||||||
|
nodeList = newNodeList;
|
||||||
|
nodeMap.clear();
|
||||||
|
for (NodeId n : nodeList) {
|
||||||
|
nodeMap.put(n.getHost(), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateAllocationParams(Resource minResource, Resource maxResource,
|
||||||
|
Resource incrResource, int containerTokenExpiryInterval) {
|
||||||
|
appParams.setMinResource(minResource);
|
||||||
|
appParams.setMaxResource(maxResource);
|
||||||
|
appParams.setIncrementResource(incrResource);
|
||||||
|
appParams.setContainerTokenExpiryInterval(containerTokenExpiryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> getBlacklist() {
|
public Set<String> getBlacklist() {
|
||||||
|
@ -104,6 +124,15 @@ public class OpportunisticContainerContext {
|
||||||
return outstandingOpReqs;
|
return outstandingOpReqs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateCompletedContainers(AllocateResponse allocateResponse) {
|
||||||
|
for (ContainerStatus cs :
|
||||||
|
allocateResponse.getCompletedContainersStatuses()) {
|
||||||
|
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||||
|
containersAllocated.remove(cs.getContainerId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes a list of ResourceRequests (asks), extracts the key information viz.
|
* Takes a list of ResourceRequests (asks), extracts the key information viz.
|
||||||
* (Priority, ResourceName, Capability) and adds to the outstanding
|
* (Priority, ResourceName, Capability) and adds to the outstanding
|
||||||
|
|
|
@ -328,8 +328,7 @@ public class NodeManager extends CompositeService
|
||||||
addService(nodeHealthChecker);
|
addService(nodeHealthChecker);
|
||||||
|
|
||||||
boolean isDistSchedulingEnabled =
|
boolean isDistSchedulingEnabled =
|
||||||
conf.getBoolean(YarnConfiguration.
|
conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
|
||||||
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
|
|
||||||
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
|
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
|
||||||
|
|
||||||
this.context = createNMContext(containerTokenSecretManager,
|
this.context = createNMContext(containerTokenSecretManager,
|
||||||
|
|
|
@ -152,7 +152,7 @@ public final class DefaultRequestInterceptor extends
|
||||||
return ((DistributedSchedulingAMProtocol)rmClient)
|
return ((DistributedSchedulingAMProtocol)rmClient)
|
||||||
.registerApplicationMasterForDistributedScheduling(request);
|
.registerApplicationMasterForDistributedScheduling(request);
|
||||||
} else {
|
} else {
|
||||||
throw new YarnException("Distributed Scheduling is not enabled !!");
|
throw new YarnException("Distributed Scheduling is not enabled.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ public final class DefaultRequestInterceptor extends
|
||||||
}
|
}
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
} else {
|
} else {
|
||||||
throw new YarnException("Distributed Scheduling is not enabled !!");
|
throw new YarnException("Distributed Scheduling is not enabled.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
|
||||||
|
@ -32,8 +33,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -48,7 +47,9 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>The DistributedScheduler runs on the NodeManager and is modeled as an
|
* <p>The DistributedScheduler runs on the NodeManager and is modeled as an
|
||||||
|
@ -74,6 +75,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
||||||
private OpportunisticContainerContext oppContainerContext =
|
private OpportunisticContainerContext oppContainerContext =
|
||||||
new OpportunisticContainerContext();
|
new OpportunisticContainerContext();
|
||||||
|
|
||||||
|
// Mapping of NodeId to NodeTokens. Populated either from RM response or
|
||||||
|
// generated locally if required.
|
||||||
|
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
|
||||||
private ApplicationAttemptId applicationAttemptId;
|
private ApplicationAttemptId applicationAttemptId;
|
||||||
private OpportunisticContainerAllocator containerAllocator;
|
private OpportunisticContainerAllocator containerAllocator;
|
||||||
private NMTokenSecretManagerInNM nmSecretManager;
|
private NMTokenSecretManagerInNM nmSecretManager;
|
||||||
|
@ -157,17 +161,17 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if we already have a NMToken. if Not, generate the Token and
|
* Adds all the newly allocated Containers to the allocate Response.
|
||||||
* add it to the response
|
* Additionally, in case the NMToken for one of the nodes does not exist, it
|
||||||
|
* generates one and adds it to the response.
|
||||||
*/
|
*/
|
||||||
private void updateResponseWithNMTokens(AllocateResponse response,
|
private void updateAllocateResponse(AllocateResponse response,
|
||||||
List<NMToken> nmTokens, List<Container> allocatedContainers) {
|
List<NMToken> nmTokens, List<Container> allocatedContainers) {
|
||||||
List<NMToken> newTokens = new ArrayList<>();
|
List<NMToken> newTokens = new ArrayList<>();
|
||||||
if (allocatedContainers.size() > 0) {
|
if (allocatedContainers.size() > 0) {
|
||||||
response.getAllocatedContainers().addAll(allocatedContainers);
|
response.getAllocatedContainers().addAll(allocatedContainers);
|
||||||
for (Container alloc : allocatedContainers) {
|
for (Container alloc : allocatedContainers) {
|
||||||
if (!oppContainerContext.getNodeTokens().containsKey(
|
if (!nodeTokens.containsKey(alloc.getNodeId())) {
|
||||||
alloc.getNodeId())) {
|
|
||||||
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
|
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -179,17 +183,14 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
||||||
|
|
||||||
private void updateParameters(
|
private void updateParameters(
|
||||||
RegisterDistributedSchedulingAMResponse registerResponse) {
|
RegisterDistributedSchedulingAMResponse registerResponse) {
|
||||||
oppContainerContext.getAppParams().setMinResource(
|
Resource incrementResource = registerResponse.getIncrContainerResource();
|
||||||
registerResponse.getMinContainerResource());
|
if (incrementResource == null) {
|
||||||
oppContainerContext.getAppParams().setMaxResource(
|
incrementResource = registerResponse.getMinContainerResource();
|
||||||
registerResponse.getMaxContainerResource());
|
|
||||||
oppContainerContext.getAppParams().setIncrementResource(
|
|
||||||
registerResponse.getIncrContainerResource());
|
|
||||||
if (oppContainerContext.getAppParams().getIncrementResource() == null) {
|
|
||||||
oppContainerContext.getAppParams().setIncrementResource(
|
|
||||||
oppContainerContext.getAppParams().getMinResource());
|
|
||||||
}
|
}
|
||||||
oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
|
oppContainerContext.updateAllocationParams(
|
||||||
|
registerResponse.getMinContainerResource(),
|
||||||
|
registerResponse.getMaxContainerResource(),
|
||||||
|
incrementResource,
|
||||||
registerResponse.getContainerTokenExpiryInterval());
|
registerResponse.getContainerTokenExpiryInterval());
|
||||||
|
|
||||||
oppContainerContext.getContainerIdGenerator()
|
oppContainerContext.getContainerIdGenerator()
|
||||||
|
@ -198,14 +199,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNodeList(List<NodeId> nodeList) {
|
private void setNodeList(List<NodeId> nodeList) {
|
||||||
oppContainerContext.getNodeMap().clear();
|
oppContainerContext.updateNodeList(nodeList);
|
||||||
addToNodeList(nodeList);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToNodeList(List<NodeId> nodes) {
|
|
||||||
for (NodeId n : nodes) {
|
|
||||||
oppContainerContext.getNodeMap().put(n.getHost(), n);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -243,23 +237,14 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
||||||
setNodeList(dsResp.getNodesForScheduling());
|
setNodeList(dsResp.getNodesForScheduling());
|
||||||
List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
|
List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
|
||||||
for (NMToken nmToken : nmTokens) {
|
for (NMToken nmToken : nmTokens) {
|
||||||
oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
|
nodeTokens.put(nmToken.getNodeId(), nmToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ContainerStatus> completedContainers =
|
oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
|
||||||
dsResp.getAllocateResponse().getCompletedContainersStatuses();
|
|
||||||
|
|
||||||
// Only account for opportunistic containers
|
|
||||||
for (ContainerStatus cs : completedContainers) {
|
|
||||||
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
|
||||||
oppContainerContext.getContainersAllocated()
|
|
||||||
.remove(cs.getContainerId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have NM tokens for all the allocated containers. If not
|
// Check if we have NM tokens for all the allocated containers. If not
|
||||||
// generate one and update the response.
|
// generate one and update the response.
|
||||||
updateResponseWithNMTokens(
|
updateAllocateResponse(
|
||||||
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
|
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -24,9 +24,11 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
|
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
|
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
|
||||||
|
|
||||||
|
@ -65,12 +67,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
|
||||||
|
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The OpportunisticContainerAllocatorAMService is started instead of the
|
* The OpportunisticContainerAllocatorAMService is started instead of the
|
||||||
|
@ -88,17 +92,20 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
|
LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
|
||||||
|
|
||||||
private final NodeQueueLoadMonitor nodeMonitor;
|
private final NodeQueueLoadMonitor nodeMonitor;
|
||||||
|
private final OpportunisticContainerAllocator oppContainerAllocator;
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
|
|
||||||
new ConcurrentHashMap<>();
|
|
||||||
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
|
|
||||||
new ConcurrentHashMap<>();
|
|
||||||
private final int k;
|
private final int k;
|
||||||
|
|
||||||
|
private final long cacheRefreshInterval;
|
||||||
|
private List<NodeId> cachedNodeIds;
|
||||||
|
private long lastCacheUpdateTime;
|
||||||
|
|
||||||
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
|
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
|
||||||
YarnScheduler scheduler) {
|
YarnScheduler scheduler) {
|
||||||
super(OpportunisticContainerAllocatorAMService.class.getName(),
|
super(OpportunisticContainerAllocatorAMService.class.getName(),
|
||||||
rmContext, scheduler);
|
rmContext, scheduler);
|
||||||
|
this.oppContainerAllocator = new OpportunisticContainerAllocator(
|
||||||
|
rmContext.getContainerTokenSecretManager(), 0);
|
||||||
this.k = rmContext.getYarnConfiguration().getInt(
|
this.k = rmContext.getYarnConfiguration().getInt(
|
||||||
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
|
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
|
||||||
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
|
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
|
||||||
|
@ -106,6 +113,8 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
|
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
|
||||||
YarnConfiguration.
|
YarnConfiguration.
|
||||||
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
|
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
|
||||||
|
this.cacheRefreshInterval = nodeSortInterval;
|
||||||
|
this.lastCacheUpdateTime = System.currentTimeMillis();
|
||||||
NodeQueueLoadMonitor.LoadComparator comparator =
|
NodeQueueLoadMonitor.LoadComparator comparator =
|
||||||
NodeQueueLoadMonitor.LoadComparator.valueOf(
|
NodeQueueLoadMonitor.LoadComparator.valueOf(
|
||||||
rmContext.getYarnConfiguration().get(
|
rmContext.getYarnConfiguration().get(
|
||||||
|
@ -172,6 +181,27 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||||
IOException {
|
IOException {
|
||||||
|
final ApplicationAttemptId appAttemptId = getAppAttemptId();
|
||||||
|
final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
|
||||||
|
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
|
||||||
|
if (appAttempt.getOpportunisticContainerContext() == null) {
|
||||||
|
OpportunisticContainerContext opCtx = new OpportunisticContainerContext();
|
||||||
|
opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
|
||||||
|
.ContainerIdGenerator() {
|
||||||
|
@Override
|
||||||
|
public long generateContainerId() {
|
||||||
|
return appAttempt.getAppSchedulingInfo().getNewContainerId();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int tokenExpiryInterval = getConfig()
|
||||||
|
.getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
|
||||||
|
YarnConfiguration.
|
||||||
|
OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
|
||||||
|
opCtx.updateAllocationParams(createMinContainerResource(),
|
||||||
|
createMaxContainerResource(), createIncrContainerResource(),
|
||||||
|
tokenExpiryInterval);
|
||||||
|
appAttempt.setOpportunisticContainerContext(opCtx);
|
||||||
|
}
|
||||||
return super.registerApplicationMaster(request);
|
return super.registerApplicationMaster(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +215,30 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
@Override
|
@Override
|
||||||
public AllocateResponse allocate(AllocateRequest request) throws
|
public AllocateResponse allocate(AllocateRequest request) throws
|
||||||
YarnException, IOException {
|
YarnException, IOException {
|
||||||
return super.allocate(request);
|
|
||||||
|
final ApplicationAttemptId appAttemptId = getAppAttemptId();
|
||||||
|
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
|
||||||
|
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
|
||||||
|
OpportunisticContainerContext oppCtx =
|
||||||
|
appAttempt.getOpportunisticContainerContext();
|
||||||
|
oppCtx.updateNodeList(getLeastLoadedNodes());
|
||||||
|
List<Container> oppContainers =
|
||||||
|
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
|
||||||
|
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
|
||||||
|
|
||||||
|
if (!oppContainers.isEmpty()) {
|
||||||
|
handleNewContainers(oppContainers, false);
|
||||||
|
appAttempt.updateNMTokens(oppContainers);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate all guaranteed containers
|
||||||
|
AllocateResponse allocateResp = super.allocate(request);
|
||||||
|
|
||||||
|
oppCtx.updateCompletedContainers(allocateResp);
|
||||||
|
|
||||||
|
// Add all opportunistic containers
|
||||||
|
allocateResp.getAllocatedContainers().addAll(oppContainers);
|
||||||
|
return allocateResp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -198,39 +251,9 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
|
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
|
||||||
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
|
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
|
||||||
dsResp.setRegisterResponse(response);
|
dsResp.setRegisterResponse(response);
|
||||||
dsResp.setMinContainerResource(
|
dsResp.setMinContainerResource(createMinContainerResource());
|
||||||
Resource.newInstance(
|
dsResp.setMaxContainerResource(createMaxContainerResource());
|
||||||
getConfig().getInt(
|
dsResp.setIncrContainerResource(createIncrContainerResource());
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
|
|
||||||
YarnConfiguration.
|
|
||||||
OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
|
|
||||||
getConfig().getInt(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
dsResp.setMaxContainerResource(
|
|
||||||
Resource.newInstance(
|
|
||||||
getConfig().getInt(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
|
|
||||||
YarnConfiguration
|
|
||||||
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
|
|
||||||
getConfig().getInt(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
dsResp.setIncrContainerResource(
|
|
||||||
Resource.newInstance(
|
|
||||||
getConfig().getInt(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
|
|
||||||
YarnConfiguration.
|
|
||||||
OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
|
|
||||||
getConfig().getInt(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
dsResp.setContainerTokenExpiryInterval(
|
dsResp.setContainerTokenExpiryInterval(
|
||||||
getConfig().getInt(
|
getConfig().getInt(
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
|
||||||
|
@ -240,8 +263,7 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
|
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
|
||||||
|
|
||||||
// Set nodes to be used for scheduling
|
// Set nodes to be used for scheduling
|
||||||
dsResp.setNodesForScheduling(
|
dsResp.setNodesForScheduling(getLeastLoadedNodes());
|
||||||
this.nodeMonitor.selectLeastLoadedNodes(this.k));
|
|
||||||
return dsResp;
|
return dsResp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,47 +272,30 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
DistributedSchedulingAllocateRequest request)
|
DistributedSchedulingAllocateRequest request)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
List<Container> distAllocContainers = request.getAllocatedContainers();
|
List<Container> distAllocContainers = request.getAllocatedContainers();
|
||||||
for (Container container : distAllocContainers) {
|
handleNewContainers(distAllocContainers, true);
|
||||||
|
AllocateResponse response = allocate(request.getAllocateRequest());
|
||||||
|
DistributedSchedulingAllocateResponse dsResp = recordFactory
|
||||||
|
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
|
||||||
|
dsResp.setAllocateResponse(response);
|
||||||
|
dsResp.setNodesForScheduling(getLeastLoadedNodes());
|
||||||
|
return dsResp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleNewContainers(List<Container> allocContainers,
|
||||||
|
boolean isRemotelyAllocated) {
|
||||||
|
for (Container container : allocContainers) {
|
||||||
// Create RMContainer
|
// Create RMContainer
|
||||||
SchedulerApplicationAttempt appAttempt =
|
SchedulerApplicationAttempt appAttempt =
|
||||||
((AbstractYarnScheduler) rmContext.getScheduler())
|
((AbstractYarnScheduler) rmContext.getScheduler())
|
||||||
.getCurrentAttemptForContainer(container.getId());
|
.getCurrentAttemptForContainer(container.getId());
|
||||||
RMContainer rmContainer = new RMContainerImpl(container,
|
RMContainer rmContainer = new RMContainerImpl(container,
|
||||||
appAttempt.getApplicationAttemptId(), container.getNodeId(),
|
appAttempt.getApplicationAttemptId(), container.getNodeId(),
|
||||||
appAttempt.getUser(), rmContext, true);
|
appAttempt.getUser(), rmContext, isRemotelyAllocated);
|
||||||
appAttempt.addRMContainer(container.getId(), rmContainer);
|
appAttempt.addRMContainer(container.getId(), rmContainer);
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerEvent(container.getId(),
|
new RMContainerEvent(container.getId(),
|
||||||
RMContainerEventType.LAUNCHED));
|
RMContainerEventType.LAUNCHED));
|
||||||
}
|
}
|
||||||
AllocateResponse response = allocate(request.getAllocateRequest());
|
|
||||||
DistributedSchedulingAllocateResponse dsResp = recordFactory
|
|
||||||
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
|
|
||||||
dsResp.setAllocateResponse(response);
|
|
||||||
dsResp.setNodesForScheduling(
|
|
||||||
this.nodeMonitor.selectLeastLoadedNodes(this.k));
|
|
||||||
return dsResp;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
|
|
||||||
String rackName, NodeId nodeId) {
|
|
||||||
if (rackName != null) {
|
|
||||||
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
|
|
||||||
Set<NodeId> nodeIds = mapping.get(rackName);
|
|
||||||
synchronized (nodeIds) {
|
|
||||||
nodeIds.add(nodeId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
|
|
||||||
String rackName, NodeId nodeId) {
|
|
||||||
if (rackName != null) {
|
|
||||||
Set<NodeId> nodeIds = mapping.get(rackName);
|
|
||||||
synchronized (nodeIds) {
|
|
||||||
nodeIds.remove(nodeId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -303,10 +308,6 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
|
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
|
||||||
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
|
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
|
||||||
nodeAddedEvent.getAddedRMNode());
|
nodeAddedEvent.getAddedRMNode());
|
||||||
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
|
|
||||||
nodeAddedEvent.getAddedRMNode().getNodeID());
|
|
||||||
addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
|
|
||||||
nodeAddedEvent.getAddedRMNode().getNodeID());
|
|
||||||
break;
|
break;
|
||||||
case NODE_REMOVED:
|
case NODE_REMOVED:
|
||||||
if (!(event instanceof NodeRemovedSchedulerEvent)) {
|
if (!(event instanceof NodeRemovedSchedulerEvent)) {
|
||||||
|
@ -315,12 +316,6 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
NodeRemovedSchedulerEvent nodeRemovedEvent =
|
NodeRemovedSchedulerEvent nodeRemovedEvent =
|
||||||
(NodeRemovedSchedulerEvent) event;
|
(NodeRemovedSchedulerEvent) event;
|
||||||
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
|
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
|
||||||
removeFromMapping(rackToNode,
|
|
||||||
nodeRemovedEvent.getRemovedRMNode().getRackName(),
|
|
||||||
nodeRemovedEvent.getRemovedRMNode().getNodeID());
|
|
||||||
removeFromMapping(hostToNode,
|
|
||||||
nodeRemovedEvent.getRemovedRMNode().getHostName(),
|
|
||||||
nodeRemovedEvent.getRemovedRMNode().getNodeID());
|
|
||||||
break;
|
break;
|
||||||
case NODE_UPDATE:
|
case NODE_UPDATE:
|
||||||
if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
||||||
|
@ -364,4 +359,58 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
|
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
|
||||||
return nodeMonitor.getThresholdCalculator();
|
return nodeMonitor.getThresholdCalculator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Resource createIncrContainerResource() {
|
||||||
|
return Resource.newInstance(
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
|
||||||
|
YarnConfiguration.
|
||||||
|
OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized List<NodeId> getLeastLoadedNodes() {
|
||||||
|
long currTime = System.currentTimeMillis();
|
||||||
|
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|
||||||
|
|| cachedNodeIds == null) {
|
||||||
|
cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
|
||||||
|
lastCacheUpdateTime = currTime;
|
||||||
|
}
|
||||||
|
return cachedNodeIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource createMaxContainerResource() {
|
||||||
|
return Resource.newInstance(
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
|
||||||
|
YarnConfiguration
|
||||||
|
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource createMinContainerResource() {
|
||||||
|
return Resource.newInstance(
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
|
||||||
|
YarnConfiguration.
|
||||||
|
OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
|
||||||
|
getConfig().getInt(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ApplicationAttemptId getAppAttemptId() throws YarnException {
|
||||||
|
AMRMTokenIdentifier amrmTokenIdentifier =
|
||||||
|
YarnServerSecurityUtils.authorizeRequest();
|
||||||
|
ApplicationAttemptId applicationAttemptId =
|
||||||
|
amrmTokenIdentifier.getApplicationAttemptId();
|
||||||
|
return applicationAttemptId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1160,6 +1160,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
Configuration config = this.rmContext.getYarnConfiguration();
|
Configuration config = this.rmContext.getYarnConfiguration();
|
||||||
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|
||||||
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
|
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
|
||||||
|
if (YarnConfiguration.isDistSchedulingEnabled(config) &&
|
||||||
|
!YarnConfiguration
|
||||||
|
.isOpportunisticContainerAllocationEnabled(config)) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Invalid parameters: opportunistic container allocation has to " +
|
||||||
|
"be enabled when distributed scheduling is enabled.");
|
||||||
|
}
|
||||||
OpportunisticContainerAllocatorAMService
|
OpportunisticContainerAllocatorAMService
|
||||||
oppContainerAllocatingAMService =
|
oppContainerAllocatingAMService =
|
||||||
new OpportunisticContainerAllocatorAMService(this.rmContext,
|
new OpportunisticContainerAllocatorAMService(this.rmContext,
|
||||||
|
@ -1169,9 +1176,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
OpportunisticContainerAllocatorAMService.class.getName());
|
OpportunisticContainerAllocatorAMService.class.getName());
|
||||||
// Add an event dispatcher for the
|
// Add an event dispatcher for the
|
||||||
// OpportunisticContainerAllocatorAMService to handle node
|
// OpportunisticContainerAllocatorAMService to handle node
|
||||||
// updates/additions and removals.
|
// additions, updates and removals. Since the SchedulerEvent is currently
|
||||||
// Since the SchedulerEvent is currently a super set of theses,
|
// a super set of theses, we register interest for it.
|
||||||
// we register interest for it..
|
|
||||||
addService(oppContainerAllocEventDispatcher);
|
addService(oppContainerAllocEventDispatcher);
|
||||||
rmDispatcher.register(SchedulerEventType.class,
|
rmDispatcher.register(SchedulerEventType.class,
|
||||||
oppContainerAllocEventDispatcher);
|
oppContainerAllocEventDispatcher);
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -122,6 +125,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
private boolean isAttemptRecovering;
|
private boolean isAttemptRecovering;
|
||||||
|
|
||||||
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
|
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
|
||||||
|
/** Resource usage of opportunistic containers. */
|
||||||
|
protected ResourceUsage attemptOpportunisticResourceUsage =
|
||||||
|
new ResourceUsage();
|
||||||
/** Scheduled by a remote scheduler. */
|
/** Scheduled by a remote scheduler. */
|
||||||
protected ResourceUsage attemptResourceUsageAllocatedRemotely =
|
protected ResourceUsage attemptResourceUsageAllocatedRemotely =
|
||||||
new ResourceUsage();
|
new ResourceUsage();
|
||||||
|
@ -140,6 +146,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
// by NM should not be recovered.
|
// by NM should not be recovered.
|
||||||
private Set<ContainerId> pendingRelease = null;
|
private Set<ContainerId> pendingRelease = null;
|
||||||
|
|
||||||
|
private OpportunisticContainerContext oppContainerContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count how many times the application has been given an opportunity to
|
* Count how many times the application has been given an opportunity to
|
||||||
* schedule a task at each priority. Each time the scheduler asks the
|
* schedule a task at each priority. Each time the scheduler asks the
|
||||||
|
@ -214,6 +222,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
writeLock = lock.writeLock();
|
writeLock = lock.writeLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setOpportunisticContainerContext(
|
||||||
|
OpportunisticContainerContext oppContext) {
|
||||||
|
this.oppContainerContext = oppContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OpportunisticContainerContext
|
||||||
|
getOpportunisticContainerContext() {
|
||||||
|
return this.oppContainerContext;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the live containers of the application.
|
* Get the live containers of the application.
|
||||||
* @return live containers of the application
|
* @return live containers of the application
|
||||||
|
@ -345,6 +363,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
liveContainers.put(id, rmContainer);
|
liveContainers.put(id, rmContainer);
|
||||||
|
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||||
|
this.attemptOpportunisticResourceUsage.incUsed(
|
||||||
|
rmContainer.getAllocatedResource());
|
||||||
|
}
|
||||||
if (rmContainer.isRemotelyAllocated()) {
|
if (rmContainer.isRemotelyAllocated()) {
|
||||||
this.attemptResourceUsageAllocatedRemotely.incUsed(
|
this.attemptResourceUsageAllocatedRemotely.incUsed(
|
||||||
rmContainer.getAllocatedResource());
|
rmContainer.getAllocatedResource());
|
||||||
|
@ -358,9 +380,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
RMContainer rmContainer = liveContainers.remove(containerId);
|
RMContainer rmContainer = liveContainers.remove(containerId);
|
||||||
if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
|
if (rmContainer != null) {
|
||||||
this.attemptResourceUsageAllocatedRemotely.decUsed(
|
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||||
rmContainer.getAllocatedResource());
|
this.attemptOpportunisticResourceUsage
|
||||||
|
.decUsed(rmContainer.getAllocatedResource());
|
||||||
|
}
|
||||||
|
if (rmContainer.isRemotelyAllocated()) {
|
||||||
|
this.attemptResourceUsageAllocatedRemotely
|
||||||
|
.decUsed(rmContainer.getAllocatedResource());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -628,12 +656,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
container.getPriority(), rmContainer.getCreationTime(),
|
container.getPriority(), rmContainer.getCreationTime(),
|
||||||
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
|
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
|
||||||
containerType));
|
containerType));
|
||||||
NMToken nmToken =
|
updateNMToken(container);
|
||||||
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
|
|
||||||
getApplicationAttemptId(), container);
|
|
||||||
if (nmToken != null) {
|
|
||||||
updatedNMTokens.add(nmToken);
|
|
||||||
}
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
// DNS might be down, skip returning this container.
|
// DNS might be down, skip returning this container.
|
||||||
LOG.error("Error trying to assign container token and NM token to"
|
LOG.error("Error trying to assign container token and NM token to"
|
||||||
|
@ -651,6 +674,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateNMTokens(Collection<Container> containers) {
|
||||||
|
for (Container container : containers) {
|
||||||
|
updateNMToken(container);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateNMToken(Container container) {
|
||||||
|
NMToken nmToken =
|
||||||
|
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
|
||||||
|
getApplicationAttemptId(), container);
|
||||||
|
if (nmToken != null) {
|
||||||
|
updatedNMTokens.add(nmToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create container token and update NMToken altogether, if either of them fails for
|
// Create container token and update NMToken altogether, if either of them fails for
|
||||||
// some reason like DNS unavailable, do not return this container and keep it
|
// some reason like DNS unavailable, do not return this container and keep it
|
||||||
// in the newlyAllocatedContainers waiting to be refetched.
|
// in the newlyAllocatedContainers waiting to be refetched.
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -37,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
|
* The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
|
||||||
|
@ -103,16 +105,23 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
private final LoadComparator comparator;
|
private final LoadComparator comparator;
|
||||||
private QueueLimitCalculator thresholdCalculator;
|
private QueueLimitCalculator thresholdCalculator;
|
||||||
|
private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
|
||||||
|
private ReentrantReadWriteLock clusterNodesLock =
|
||||||
|
new ReentrantReadWriteLock();
|
||||||
|
|
||||||
Runnable computeTask = new Runnable() {
|
Runnable computeTask = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (sortedNodes) {
|
ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
sortedNodes.clear();
|
sortedNodes.clear();
|
||||||
sortedNodes.addAll(sortNodes());
|
sortedNodes.addAll(sortNodes());
|
||||||
if (thresholdCalculator != null) {
|
if (thresholdCalculator != null) {
|
||||||
thresholdCalculator.update();
|
thresholdCalculator.update();
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -166,9 +175,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
@Override
|
@Override
|
||||||
public void removeNode(RMNode removedRMNode) {
|
public void removeNode(RMNode removedRMNode) {
|
||||||
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
|
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
|
||||||
synchronized (this.clusterNodes) {
|
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
|
||||||
if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
|
writeLock.lock();
|
||||||
this.clusterNodes.remove(removedRMNode.getNodeID());
|
ClusterNode node;
|
||||||
|
try {
|
||||||
|
node = this.clusterNodes.remove(removedRMNode.getNodeID());
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
if (node != null) {
|
||||||
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
|
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Node not in list!");
|
LOG.debug("Node not in list!");
|
||||||
|
@ -186,7 +202,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
|
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
|
||||||
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
|
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
|
||||||
// UNLESS comparator is based on queue length.
|
// UNLESS comparator is based on queue length.
|
||||||
synchronized (this.clusterNodes) {
|
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
|
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
|
||||||
if (currentNode == null) {
|
if (currentNode == null) {
|
||||||
if (estimatedQueueWaitTime != -1
|
if (estimatedQueueWaitTime != -1
|
||||||
|
@ -222,6 +240,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
"wait queue length [" + currentNode.queueLength + "]");
|
"wait queue length [" + currentNode.queueLength + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,15 +265,22 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
* @return ordered list of nodes
|
* @return ordered list of nodes
|
||||||
*/
|
*/
|
||||||
public List<NodeId> selectLeastLoadedNodes(int k) {
|
public List<NodeId> selectLeastLoadedNodes(int k) {
|
||||||
synchronized (this.sortedNodes) {
|
ReentrantReadWriteLock.ReadLock readLock = sortedNodesLock.readLock();
|
||||||
return ((k < this.sortedNodes.size()) && (k >= 0)) ?
|
readLock.lock();
|
||||||
|
try {
|
||||||
|
List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
|
||||||
new ArrayList<>(this.sortedNodes).subList(0, k) :
|
new ArrayList<>(this.sortedNodes).subList(0, k) :
|
||||||
new ArrayList<>(this.sortedNodes);
|
new ArrayList<>(this.sortedNodes);
|
||||||
|
return Collections.unmodifiableList(retVal);
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<NodeId> sortNodes() {
|
private List<NodeId> sortNodes() {
|
||||||
synchronized (this.clusterNodes) {
|
ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
|
||||||
|
readLock.lock();
|
||||||
|
try {
|
||||||
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
|
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
|
||||||
List<NodeId> retList = new ArrayList<>();
|
List<NodeId> retList = new ArrayList<>();
|
||||||
Object[] nodes = aList.toArray();
|
Object[] nodes = aList.toArray();
|
||||||
|
@ -267,6 +294,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
retList.add(((ClusterNode)nodes[j]).nodeId);
|
retList.add(((ClusterNode)nodes[j]).nodeId);
|
||||||
}
|
}
|
||||||
return retList;
|
return retList;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSche
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -79,7 +80,7 @@ public class TestOpportunisticContainerAllocatorAMService {
|
||||||
// DSProtocol as well as AMProtocol clients
|
// DSProtocol as well as AMProtocol clients
|
||||||
@Test
|
@Test
|
||||||
public void testRPCWrapping() throws Exception {
|
public void testRPCWrapping() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
|
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
|
||||||
.getName());
|
.getName());
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
@ -97,6 +98,11 @@ public class TestOpportunisticContainerAllocatorAMService {
|
||||||
public Configuration getYarnConfiguration() {
|
public Configuration getYarnConfiguration() {
|
||||||
return new YarnConfiguration();
|
return new YarnConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||||
|
return new RMContainerTokenSecretManager(conf);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
Container c = factory.newRecordInstance(Container.class);
|
Container c = factory.newRecordInstance(Container.class);
|
||||||
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
|
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
|
||||||
|
@ -117,8 +123,8 @@ public class TestOpportunisticContainerAllocatorAMService {
|
||||||
Server server = service.getServer(rpc, conf, addr, null);
|
Server server = service.getServer(rpc, conf, addr, null);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
// Verify that the DistrubutedSchedulingService can handle vanilla
|
// Verify that the OpportunisticContainerAllocatorAMSercvice can handle
|
||||||
// ApplicationMasterProtocol clients
|
// vanilla ApplicationMasterProtocol clients
|
||||||
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
|
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
ApplicationMasterProtocolPB ampProxy =
|
ApplicationMasterProtocolPB ampProxy =
|
||||||
|
|
Loading…
Reference in New Issue