YARN-7258. Add Node and Rack Hints to Opportunistic Scheduler. (Kartheek Muthyala via asuresh).

(cherry picked from commit b733348dde)
This commit is contained in:
Arun Suresh 2017-10-02 18:01:51 -07:00
parent 807d8b41ad
commit b61d12e300
11 changed files with 1000 additions and 732 deletions

View File

@ -222,6 +222,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
return this; return this;
} }
/**
* Set the <code>executionTypeRequest</code> of the request with 'ensure
* execution type' flag set to true.
* @see ResourceRequest#setExecutionTypeRequest(
* ExecutionTypeRequest)
* @param executionType <code>executionType</code> of the request.
* @return {@link ResourceRequestBuilder}
*/
@Public
@Evolving
public ResourceRequestBuilder executionType(ExecutionType executionType) {
resourceRequest.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(executionType, true));
return this;
}
/** /**
* Set the <code>allocationRequestId</code> of the request. * Set the <code>allocationRequestId</code> of the request.
* @see ResourceRequest#setAllocationRequestId(long) * @see ResourceRequest#setAllocationRequestId(long)

View File

@ -1,645 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
* specifying OPPORTUNISTIC containers in its resource requests,
* the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
* on the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the OpportunisticContainerAllocatorAMService running on the RM.
*/
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
private static final Log LOG =
LogFactory.getLog(TestDistributedScheduling.class);
protected MiniYARNCluster cluster;
protected YarnClient rmClient;
protected ApplicationMasterProtocol client;
protected Configuration conf;
protected Configuration yarnConf;
protected ApplicationAttemptId attemptId;
protected ApplicationId appId;
@Before
public void doBefore() throws Exception {
cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
10);
cluster.init(conf);
cluster.start();
yarnConf = cluster.getConfig();
// the client has to connect to AMRMProxy
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
// Submit application
attemptId = createApp(rmClient, cluster, conf);
appId = attemptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
}
@After
public void doAfter() throws Exception {
if (client != null) {
try {
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
rmClient.killApplication(attemptId.getApplicationId());
attemptId = null;
} catch (Exception e) {
}
}
if (rmClient != null) {
try {
rmClient.stop();
} catch (Exception e) {
}
}
if (cluster != null) {
try {
cluster.stop();
} catch (Exception e) {
}
}
}
/**
* Validates if Allocate Requests containing only OPPORTUNISTIC container
* requests are satisfied instantly.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
// Wait until the RM has been updated and verify
Map<ApplicationId, RMApp> rmApps =
cluster.getResourceManager().getRMContext().getRMApps();
boolean rmUpdated = false;
for (int i=0; i<10 && !rmUpdated; i++) {
sleep(100);
RMApp rmApp = rmApps.get(appId);
if (rmApp.getState() == RMAppState.RUNNING) {
rmUpdated = true;
}
}
RMApp rmApp = rmApps.get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
// Replace 'ANY' requests with OPPORTUNISTIC aks and remove
// everything else
List<ResourceRequest> newAskList = new ArrayList<>();
for (ResourceRequest rr : request.getAskList()) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
// Check that the RM sees OPPORTUNISTIC containers
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerId containerId = allocatedContainer.getId();
RMContainer rmContainer = scheduler.getRMContainer(containerId);
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
* Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC
* container requests works as expected.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testMixedExecutionTypeRequestE2E() throws Exception {
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
List<ResourceRequest> askList = request.getAskList();
List<ResourceRequest> newAskList = new ArrayList<>(askList);
// Duplicate all ANY requests marking them as opportunistic
for (ResourceRequest rr : askList) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
request.setAskList(new ArrayList<ResourceRequest>());
request.setResponseId(request.getResponseId() + 1);
Thread.sleep(1000);
// RM should allocate GUARANTEED containers within 2 calls to allocate()
allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are GUARANTEED
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.GUARANTEED,
containerTokenIdentifier.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
* Validates if AMRMClient can be used with Distributed Scheduling turned on.
*
* @throws Exception
*/
@Test(timeout = 120000)
@SuppressWarnings("unchecked")
public void testAMRMClient() throws Exception {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
try {
Priority priority = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
Resource capability = Resource.newInstance(1024, 1);
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
// start am rm client
amClient = new AMRMClientImpl(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(1, oppContainersRequestedAny);
assertEquals(4, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 10;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
while (allocatedContainerCount <
(containersRequestedAny + oppContainersRequestedAny)
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount += allocResponse.getAllocatedContainers()
.size();
for (Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(allocatedContainerCount,
containersRequestedAny + oppContainersRequestedAny);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
assertEquals(3, amClient.release.size());
assertEquals(0, amClient.ask.size());
// need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
// test RPC exception handling
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
final AMRMClient amc = amClient;
ApplicationMasterProtocol realRM = amClient.rmClient;
try {
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
.class);
when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
new Answer<AllocateResponse>() {
public AllocateResponse answer(InvocationOnMock invocation)
throws Exception {
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes,
racks, priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks,
priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null,
priority2, 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
throw new Exception();
}
});
amClient.rmClient = mockRM;
amClient.allocate(0.1f);
} catch (Exception ioe) {
} finally {
amClient.rmClient = realRM;
}
assertEquals(3, amClient.release.size());
assertEquals(6, amClient.ask.size());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers
while (iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
assertEquals(0, allocResponse.getAllocatedContainers().size());
if (allocResponse.getCompletedContainersStatuses().size() > 0) {
for (ContainerStatus cStatus : allocResponse
.getCompletedContainersStatuses()) {
if (releases.contains(cStatus.getContainerId())) {
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == Service.STATE
.STARTED) {
amClient.stop();
}
}
}
/**
* Check if an AM can ask for opportunistic containers and get them.
* @throws Exception
*/
@Test
public void testAMOpportunistic() throws Exception {
// Basic container to request
Resource capability = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(1);
// Get the cluster topology
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
// Create an AM to request resources
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
try {
amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
// AM requests an opportunistic container
ExecutionTypeRequest execTypeRequest =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
capability, nodes, racks, priority, 0, true, null, execTypeRequest);
amClient.addContainerRequest(containerRequest);
// Wait until the container is allocated
ContainerId opportunisticContainerId = null;
for (int i=0; i<10 && opportunisticContainerId == null; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> allocatedContainers =
allocResponse.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
// Check that this is the container we required
assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainer.getExecutionType());
opportunisticContainerId = allocatedContainer.getId();
}
sleep(100);
}
assertNotNull(opportunisticContainerId);
// The RM sees the container as OPPORTUNISTIC
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
RMContainer rmContainer = scheduler.getRMContainer(
opportunisticContainerId);
assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
// Release the opportunistic container
amClient.releaseAssignedContainer(opportunisticContainerId);
// Wait for the release container to appear
boolean released = false;
for (int i=0; i<10 && !released; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<ContainerStatus> completedContainers =
allocResponse.getCompletedContainersStatuses();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId completedContainerId =
completedContainer.getContainerId();
assertEquals(completedContainerId, opportunisticContainerId);
released = true;
}
if (!released) {
sleep(100);
}
}
assertTrue(released);
// The RM shouldn't see the container anymore
rmContainer = scheduler.getRMContainer(opportunisticContainerId);
assertNull(rmContainer);
// Clean the AM
amClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, null, null);
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.close();
}
}
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -88,7 +88,7 @@ import static org.junit.Assert.assertNotNull;
* Class that tests the allocation of OPPORTUNISTIC containers through the * Class that tests the allocation of OPPORTUNISTIC containers through the
* centralized ResourceManager. * centralized ResourceManager.
*/ */
public class TestOpportunisticContainerAllocation { public class TestOpportunisticContainerAllocationE2E {
private static Configuration conf = null; private static Configuration conf = null;
private static MiniYARNCluster yarnCluster = null; private static MiniYARNCluster yarnCluster = null;
private static YarnClient yarnClient = null; private static YarnClient yarnClient = null;

View File

@ -46,6 +46,24 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
return remoteNode; return remoteNode;
} }
/**
* Create new Instance.
* @param nodeId NodeId.
* @param httpAddress Http address.
* @param rackName Rack Name.
* @return RemoteNode instance.
*/
@Private
@Unstable
public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
String rackName) {
RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
remoteNode.setNodeId(nodeId);
remoteNode.setHttpAddress(httpAddress);
remoteNode.setRackName(rackName);
return remoteNode;
}
/** /**
* Get {@link NodeId}. * Get {@link NodeId}.
* @return NodeId. * @return NodeId.
@ -78,6 +96,22 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
@Unstable @Unstable
public abstract void setHttpAddress(String httpAddress); public abstract void setHttpAddress(String httpAddress);
/**
* Get Rack Name.
* @return Rack Name.
*/
@Private
@Unstable
public abstract String getRackName();
/**
* Set Rack Name.
* @param rackName Rack Name.
*/
@Private
@Unstable
public abstract void setRackName(String rackName);
/** /**
* Use the underlying {@link NodeId} comparator. * Use the underlying {@link NodeId} comparator.
* @param other RemoteNode. * @param other RemoteNode.
@ -92,6 +126,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
public String toString() { public String toString() {
return "RemoteNode{" + return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " + "nodeId=" + getNodeId() + ", " +
"rackName=" + getRackName() + ", " +
"httpAddress=" + getHttpAddress() + "}"; "httpAddress=" + getHttpAddress() + "}";
} }
} }

View File

@ -117,6 +117,25 @@ public class RemoteNodePBImpl extends RemoteNode {
builder.setHttpAddress(httpAddress); builder.setHttpAddress(httpAddress);
} }
@Override
public String getRackName() {
RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasRackName()) {
return null;
}
return (p.getRackName());
}
@Override
public void setRackName(String rackName) {
maybeInitBuilder();
if (rackName == null) {
builder.clearRackName();
return;
}
builder.setRackName(rackName);
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();

View File

@ -45,11 +45,14 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -61,6 +64,10 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class OpportunisticContainerAllocator { public class OpportunisticContainerAllocator {
private static final int NODE_LOCAL_LOOP = 0;
private static final int RACK_LOCAL_LOOP = 1;
private static final int OFF_SWITCH_LOOP = 2;
/** /**
* This class encapsulates application specific parameters used to build a * This class encapsulates application specific parameters used to build a
* Container. * Container.
@ -70,6 +77,7 @@ public class OpportunisticContainerAllocator {
private Resource minResource; private Resource minResource;
private Resource incrementResource; private Resource incrementResource;
private int containerTokenExpiryInterval; private int containerTokenExpiryInterval;
private int maxAllocationsPerSchedulerKeyPerRound = 1;
/** /**
* Return Max Resource. * Return Max Resource.
@ -135,6 +143,24 @@ public class OpportunisticContainerAllocator {
int containerTokenExpiryInterval) { int containerTokenExpiryInterval) {
this.containerTokenExpiryInterval = containerTokenExpiryInterval; this.containerTokenExpiryInterval = containerTokenExpiryInterval;
} }
/**
* Get the Max Allocations per Scheduler Key per allocation round.
* @return maxAllocationsPerSchedulerKeyPerRound.
*/
public int getMaxAllocationsPerSchedulerKeyPerRound() {
return maxAllocationsPerSchedulerKeyPerRound;
}
/**
* Set the Max Allocations per Scheduler Key per allocation round.
* @param maxAllocationsPerSchedulerKeyPerRound val.
*/
public void setMaxAllocationsPerSchedulerKeyPerRound(
int maxAllocationsPerSchedulerKeyPerRound) {
this.maxAllocationsPerSchedulerKeyPerRound =
maxAllocationsPerSchedulerKeyPerRound;
}
} }
/** /**
@ -188,6 +214,72 @@ public class OpportunisticContainerAllocator {
private final BaseContainerTokenSecretManager tokenSecretManager; private final BaseContainerTokenSecretManager tokenSecretManager;
static class Allocation {
private final Container container;
private final String resourceName;
Allocation(Container container, String resourceName) {
this.container = container;
this.resourceName = resourceName;
}
Container getContainer() {
return container;
}
String getResourceName() {
return resourceName;
}
}
static class EnrichedResourceRequest {
private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
private final ResourceRequest request;
EnrichedResourceRequest(ResourceRequest request) {
this.request = request;
}
ResourceRequest getRequest() {
return request;
}
void addLocation(String location, int count) {
Map<String, AtomicInteger> m = rackLocations;
if (!location.startsWith("/")) {
m = nodeLocations;
}
if (count == 0) {
m.remove(location);
} else {
m.put(location, new AtomicInteger(count));
}
}
void removeLocation(String location) {
Map<String, AtomicInteger> m = rackLocations;
AtomicInteger count = m.get(location);
if (count == null) {
m = nodeLocations;
count = m.get(location);
}
if (count != null) {
if (count.decrementAndGet() == 0) {
m.remove(location);
}
}
}
Set<String> getNodeLocations() {
return nodeLocations.keySet();
}
Set<String> getRackLocations() {
return rackLocations.keySet();
}
}
/** /**
* Create a new Opportunistic Container Allocator. * Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager * @param tokenSecretManager TokenSecretManager
@ -223,37 +315,55 @@ public class OpportunisticContainerAllocator {
// Add OPPORTUNISTIC requests to the outstanding ones. // Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs); opportContext.addToOutstandingReqs(oppResourceReqs);
// Satisfy the outstanding OPPORTUNISTIC requests. Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
List<Container> allocatedContainers = new ArrayList<>(); List<Container> allocatedContainers = new ArrayList<>();
for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) { // Satisfy the outstanding OPPORTUNISTIC requests.
// Allocated containers : boolean continueLoop = true;
// Key = Requested Capability, while (continueLoop) {
// Value = List of Containers of given cap (the actual container size continueLoop = false;
// might be different than what is requested, which is why List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
// we need the requested capability (key) to match against for (SchedulerRequestKey schedulerKey :
// the outstanding reqs) opportContext.getOutstandingOpReqs().descendingKeySet()) {
Map<Resource, List<Container>> allocated = allocate(rmIdentifier, // Allocated containers :
opportContext, schedulerKey, applicationAttemptId, appSubmitter); // Key = Requested Capability,
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { // Value = List of Containers of given cap (the actual container size
opportContext.matchAllocationToOutstandingRequest( // might be different than what is requested, which is why
e.getKey(), e.getValue()); // we need the requested capability (key) to match against
allocatedContainers.addAll(e.getValue()); // the outstanding reqs)
Map<Resource, List<Allocation>> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
appSubmitter, nodeBlackList);
if (allocation.size() > 0) {
allocations.add(allocation);
continueLoop = true;
}
}
for (Map<Resource, List<Allocation>> allocation : allocations) {
for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
for (Allocation alloc : e.getValue()) {
allocatedContainers.add(alloc.getContainer());
}
}
} }
} }
return allocatedContainers; return allocatedContainers;
} }
private Map<Resource, List<Container>> allocate(long rmIdentifier, private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException { ApplicationAttemptId appAttId, String userName, Set<String> blackList)
Map<Resource, List<Container>> containers = new HashMap<>(); throws YarnException {
for (ResourceRequest anyAsk : Map<Resource, List<Allocation>> containers = new HashMap<>();
for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) { appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(), allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(), appContext.getContainerIdGenerator(), blackList, appAttId,
appAttId, appContext.getNodeMap(), userName, containers, anyAsk); appContext.getNodeMap(), userName, containers, enrichedAsk);
ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) { if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, " LOG.info("Opportunistic allocation requested for [priority={}, "
+ "allocationRequestId={}, num_containers={}, capability={}] " + "allocationRequestId={}, num_containers={}, capability={}] "
@ -269,43 +379,162 @@ public class OpportunisticContainerAllocator {
AllocationParams appParams, ContainerIdGenerator idCounter, AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist, ApplicationAttemptId id, Set<String> blacklist, ApplicationAttemptId id,
Map<String, RemoteNode> allNodes, String userName, Map<String, RemoteNode> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk) Map<Resource, List<Allocation>> allocations,
EnrichedResourceRequest enrichedAsk)
throws YarnException { throws YarnException {
int toAllocate = anyAsk.getNumContainers() if (allNodes.size() == 0) {
- (containers.isEmpty() ? 0 : LOG.info("No nodes currently available to " +
containers.get(anyAsk.getCapability()).size()); "allocate OPPORTUNISTIC containers.");
List<RemoteNode> nodesForScheduling = new ArrayList<>();
for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
nodesForScheduling.add(nodeEntry.getValue());
}
if (nodesForScheduling.isEmpty()) {
LOG.warn("No nodes available for allocating opportunistic containers. [" +
"allNodes={}, blacklist={}]", allNodes, blacklist);
return; return;
} }
ResourceRequest anyAsk = enrichedAsk.getRequest();
int toAllocate = anyAsk.getNumContainers()
- (allocations.isEmpty() ? 0 :
allocations.get(anyAsk.getCapability()).size());
toAllocate = Math.min(toAllocate,
appParams.getMaxAllocationsPerSchedulerKeyPerRound());
int numAllocated = 0; int numAllocated = 0;
int nextNodeToSchedule = 0; // Node Candidates are selected as follows:
for (int numCont = 0; numCont < toAllocate; numCont++) { // * Node local candidates selected in loop == 0
nextNodeToSchedule++; // * Rack local candidates selected in loop == 1
nextNodeToSchedule %= nodesForScheduling.size(); // * From loop == 2 onwards, we revert to off switch allocations.
RemoteNode node = nodesForScheduling.get(nextNodeToSchedule); int loopIndex = OFF_SWITCH_LOOP;
Container container = buildContainer(rmIdentifier, appParams, idCounter, if (enrichedAsk.getNodeLocations().size() > 0) {
anyAsk, id, userName, node); loopIndex = NODE_LOCAL_LOOP;
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
containers.put(anyAsk.getCapability(), cList);
}
cList.add(container);
numAllocated++;
LOG.info("Allocated [{}] as opportunistic.", container.getId());
} }
LOG.info("Allocated {} opportunistic containers.", numAllocated); while (numAllocated < toAllocate) {
Collection<RemoteNode> nodeCandidates =
findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
for (RemoteNode rNode : nodeCandidates) {
String rNodeHost = rNode.getNodeId().getHost();
// Ignore black list
if (blacklist.contains(rNodeHost)) {
LOG.info("Nodes for scheduling has a blacklisted node" +
" [" + rNodeHost + "]..");
continue;
}
String location = ResourceRequest.ANY;
if (loopIndex == NODE_LOCAL_LOOP) {
if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
location = rNodeHost;
} else {
continue;
}
}
if (loopIndex == RACK_LOCAL_LOOP) {
if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
location = rNode.getRackName();
} else {
continue;
}
}
Container container = createContainer(rmIdentifier, appParams,
idCounter, id, userName, allocations, location,
anyAsk, rNode);
numAllocated++;
// Try to spread the allocations across the nodes.
// But don't add if it is a node local request.
if (loopIndex != NODE_LOCAL_LOOP) {
blacklist.add(rNode.getNodeId().getHost());
}
LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
"location [" + location + "]");
if (numAllocated >= toAllocate) {
break;
}
}
if (loopIndex == NODE_LOCAL_LOOP &&
enrichedAsk.getRackLocations().size() > 0) {
loopIndex = RACK_LOCAL_LOOP;
} else {
loopIndex++;
}
// Handle case where there are no nodes remaining after blacklist is
// considered.
if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
LOG.warn("Unable to allocate any opportunistic containers.");
break;
}
}
}
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
EnrichedResourceRequest enrichedRR) {
if (loopIndex > 1) {
return allNodes.values();
} else {
LinkedList<RemoteNode> retList = new LinkedList<>();
int numContainers = enrichedRR.getRequest().getNumContainers();
while (numContainers > 0) {
if (loopIndex == 0) {
// Node local candidates
numContainers = collectNodeLocalCandidates(
allNodes, enrichedRR, retList, numContainers);
} else {
// Rack local candidates
numContainers = collectRackLocalCandidates(
allNodes, enrichedRR, retList, blackList, numContainers);
}
if (numContainers == enrichedRR.getRequest().getNumContainers()) {
// If there is no change in numContainers, then there is no point
// in looping again.
break;
}
}
return retList;
}
}
private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
Set<String> blackList, int numContainers) {
for (RemoteNode rNode : allNodes.values()) {
if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
if (blackList.contains(rNode.getNodeId().getHost())) {
retList.addLast(rNode);
} else {
retList.addFirst(rNode);
numContainers--;
}
}
if (numContainers == 0) {
break;
}
}
return numContainers;
}
private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
int numContainers) {
for (String nodeName : enrichedRR.getNodeLocations()) {
RemoteNode remoteNode = allNodes.get(nodeName);
if (remoteNode != null) {
retList.add(remoteNode);
numContainers--;
}
if (numContainers == 0) {
break;
}
}
return numContainers;
}
private Container createContainer(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
ApplicationAttemptId id, String userName,
Map<Resource, List<Allocation>> allocations, String location,
ResourceRequest anyAsk, RemoteNode rNode) throws YarnException {
Container container = buildContainer(rmIdentifier, appParams,
idCounter, anyAsk, id, userName, rNode);
List<Allocation> allocList = allocations.get(anyAsk.getCapability());
if (allocList == null) {
allocList = new ArrayList<>();
allocations.put(anyAsk.getCapability(), allocList);
}
allocList.add(new Allocation(container, location));
return container;
} }
private Container buildContainer(long rmIdentifier, private Container buildContainer(long rmIdentifier,

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.scheduler; package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@ -35,8 +34,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest;
/** /**
* This encapsulates application specific information used by the * This encapsulates application specific information used by the
@ -53,7 +54,8 @@ public class OpportunisticContainerContext {
new ContainerIdGenerator(); new ContainerIdGenerator();
private volatile List<RemoteNode> nodeList = new LinkedList<>(); private volatile List<RemoteNode> nodeList = new LinkedList<>();
private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>(); private final LinkedHashMap<String, RemoteNode> nodeMap =
new LinkedHashMap<>();
private final Set<String> blacklist = new HashSet<>(); private final Set<String> blacklist = new HashSet<>();
@ -61,7 +63,8 @@ public class OpportunisticContainerContext {
// Resource Name (host/rack/any) and capability. This mapping is required // Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC // to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask). // ResourceRequest (ask).
private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> private final TreeMap
<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
outstandingOpReqs = new TreeMap<>(); outstandingOpReqs = new TreeMap<>();
public AllocationParams getAppParams() { public AllocationParams getAppParams() {
@ -107,7 +110,7 @@ public class OpportunisticContainerContext {
return blacklist; return blacklist;
} }
public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> public TreeMap<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
getOutstandingOpReqs() { getOutstandingOpReqs() {
return outstandingOpReqs; return outstandingOpReqs;
} }
@ -125,36 +128,32 @@ public class OpportunisticContainerContext {
for (ResourceRequest request : resourceAsks) { for (ResourceRequest request : resourceAsks) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
// TODO: Extend for Node/Rack locality. We only handle ANY requests now Map<Resource, EnrichedResourceRequest> reqMap =
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
continue;
}
if (request.getNumContainers() == 0) {
continue;
}
Map<Resource, ResourceRequest> reqMap =
outstandingOpReqs.get(schedulerKey); outstandingOpReqs.get(schedulerKey);
if (reqMap == null) { if (reqMap == null) {
reqMap = new HashMap<>(); reqMap = new HashMap<>();
outstandingOpReqs.put(schedulerKey, reqMap); outstandingOpReqs.put(schedulerKey, reqMap);
} }
ResourceRequest resourceRequest = reqMap.get(request.getCapability()); EnrichedResourceRequest eReq = reqMap.get(request.getCapability());
if (resourceRequest == null) { if (eReq == null) {
resourceRequest = request; eReq = new EnrichedResourceRequest(request);
reqMap.put(request.getCapability(), request); reqMap.put(request.getCapability(), eReq);
}
// Set numContainers only for ANY request
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
eReq.getRequest().setResourceName(ResourceRequest.ANY);
eReq.getRequest().setNumContainers(request.getNumContainers());
} else { } else {
resourceRequest.setNumContainers( eReq.addLocation(request.getResourceName(), request.getNumContainers());
resourceRequest.getNumContainers() + request.getNumContainers());
} }
if (ResourceRequest.isAnyLocation(request.getResourceName())) { if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at " LOG.info("# of outstandingOpReqs in ANY (at "
+ "priority = " + schedulerKey.getPriority() + "priority = " + schedulerKey.getPriority()
+ ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", allocationReqId = " + schedulerKey.getAllocationRequestId()
+ ", with capability = " + request.getCapability() + " ) : " + ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers()); + ", with location = " + request.getResourceName() + " ) : "
+ ", numContainers = " + eReq.getRequest().getNumContainers());
} }
} }
} }
@ -163,25 +162,34 @@ public class OpportunisticContainerContext {
* This method matches a returned list of Container Allocations to any * This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest. * outstanding OPPORTUNISTIC ResourceRequest.
* @param capability Capability * @param capability Capability
* @param allocatedContainers Allocated Containers * @param allocations Allocations.
*/ */
public void matchAllocationToOutstandingRequest(Resource capability, public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) { List<Allocation> allocations) {
for (Container c : allocatedContainers) { for (OpportunisticContainerAllocator.Allocation allocation : allocations) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey schedulerKey =
SchedulerRequestKey.extractFrom(c); SchedulerRequestKey.extractFrom(allocation.getContainer());
Map<Resource, ResourceRequest> asks = Map<Resource, EnrichedResourceRequest> asks =
outstandingOpReqs.get(schedulerKey); outstandingOpReqs.get(schedulerKey);
if (asks == null) { if (asks == null) {
continue; continue;
} }
ResourceRequest rr = asks.get(capability); EnrichedResourceRequest err = asks.get(capability);
if (rr != null) { if (err != null) {
rr.setNumContainers(rr.getNumContainers() - 1); int numContainers = err.getRequest().getNumContainers();
if (rr.getNumContainers() == 0) { numContainers--;
err.getRequest().setNumContainers(numContainers);
if (numContainers == 0) {
asks.remove(capability); asks.remove(capability);
if (asks.size() == 0) {
outstandingOpReqs.remove(schedulerKey);
}
} else {
if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) {
err.removeLocation(allocation.getResourceName());
}
} }
} }
} }

View File

@ -30,6 +30,7 @@ import "yarn_service_protos.proto";
message RemoteNodeProto { message RemoteNodeProto {
optional NodeIdProto node_id = 1; optional NodeIdProto node_id = 1;
optional string http_address = 2; optional string http_address = 2;
optional string rack_name = 3;
} }
message RegisterDistributedSchedulingAMResponseProto { message RegisterDistributedSchedulingAMResponseProto {

View File

@ -0,0 +1,599 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class TestOpportunisticContainerAllocator {
private static final int GB = 1024;
private OpportunisticContainerAllocator allocator = null;
private OpportunisticContainerContext oppCntxt = null;
@Before
public void setup() {
SecurityUtil.setTokenServiceUseIp(false);
final MasterKey mKey = new MasterKey() {
@Override
public int getKeyId() {
return 1;
}
@Override
public void setKeyId(int keyId) {}
@Override
public ByteBuffer getBytes() {
return ByteBuffer.allocate(8);
}
@Override
public void setBytes(ByteBuffer bytes) {}
};
BaseContainerTokenSecretManager secMan =
new BaseContainerTokenSecretManager(new Configuration()) {
@Override
public MasterKey getCurrentKey() {
return mKey;
}
@Override
public byte[] createPassword(ContainerTokenIdentifier identifier) {
return new byte[]{1, 2};
}
};
allocator = new OpportunisticContainerAllocator(secMan);
oppCntxt = new OpportunisticContainerContext();
oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
}
@Test
public void testSimpleAllocation() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Assert.assertEquals(1, containers.size());
Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
}
@Test
public void testBlacklistRejection() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
Arrays.asList("h1", "h2"), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Assert.assertEquals(0, containers.size());
Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
}
@Test
public void testRoundRobinSimpleAllocation() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
System.out.println(containers);
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
Assert.assertTrue(allocatedHosts.contains("h1:1234"));
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
Assert.assertTrue(allocatedHosts.contains("h3:1234"));
Assert.assertEquals(3, containers.size());
}
@Test
public void testNodeLocalAllocation() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
System.out.println(containers);
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
Assert.assertEquals(2, containers.size());
Assert.assertTrue(allocatedHosts.contains("h1:1234"));
Assert.assertFalse(allocatedHosts.contains("h2:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
}
@Test
public void testNodeLocalAllocationSameSchedKey() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
System.out.println(containers);
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
Assert.assertEquals(2, containers.size());
Assert.assertTrue(allocatedHosts.contains("h1:1234"));
Assert.assertFalse(allocatedHosts.contains("h2:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
}
@Test
public void testSimpleRackLocalAllocation() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "h1",
Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
Assert.assertFalse(allocatedHosts.contains("h4:1234"));
Assert.assertEquals(1, containers.size());
}
@Test
public void testRoundRobinRackLocalAllocation() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
System.out.println(containers);
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
Assert.assertFalse(allocatedHosts.contains("h4:1234"));
Assert.assertEquals(2, containers.size());
}
@Test
public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "h1",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Set<String> allocatedHosts = new HashSet<>();
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
}
System.out.println(containers);
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
Assert.assertFalse(allocatedHosts.contains("h4:1234"));
Assert.assertEquals(2, containers.size());
}
@Test
public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "h6",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r3",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
System.out.println(containers);
Assert.assertEquals(2, containers.size());
}
@Test
public void testLotsOfContainersRackLocalAllocationSameSchedKey()
throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
Resources.createResource(1 * GB), 1000, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "h1",
Resources.createResource(1 * GB), 1000, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 1000, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = new ArrayList<>();
for (int i = 0; i < 250; i++) {
containers.addAll(allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
}
Assert.assertEquals(1000, containers.size());
}
@Test
public void testLotsOfContainersRackLocalAllocation()
throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1))
.resourceName("*")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
}
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = new ArrayList<>();
for (int i = 0; i < 25; i++) {
containers.addAll(allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
}
Assert.assertEquals(100, containers.size());
}
}

View File

@ -432,8 +432,12 @@ public class OpportunisticContainerAllocatorAMService
private RemoteNode convertToRemoteNode(NodeId nodeId) { private RemoteNode convertToRemoteNode(NodeId nodeId) {
SchedulerNode node = SchedulerNode node =
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId); ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress()) if (node != null) {
: null; RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
rNode.setRackName(node.getRackName());
return rNode;
}
return null;
} }
private static ApplicationAttemptId getAppAttemptId() throws YarnException { private static ApplicationAttemptId getAppAttemptId() throws YarnException {

View File

@ -610,6 +610,8 @@ public class TestOpportunisticContainerAllocatorAMService {
.newInstance(ExecutionType.OPPORTUNISTIC, true))), null); .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
List<Container> allocatedContainers = List<Container> allocatedContainers =
allocateResponse.getAllocatedContainers(); allocateResponse.getAllocatedContainers();
allocatedContainers.addAll(
am1.allocate(null, null).getAllocatedContainers());
Assert.assertEquals(2, allocatedContainers.size()); Assert.assertEquals(2, allocatedContainers.size());
Container container = allocatedContainers.get(0); Container container = allocatedContainers.get(0);
// Start Container in NM // Start Container in NM