YARN-9752. Add support for allocation id in SLS. Contributed by Abhishek Modi

This commit is contained in:
Abhishek Modi 2019-08-21 20:39:51 +05:30
parent 7ab88dbfa6
commit 3ad1fcfc8b
7 changed files with 172 additions and 19 deletions

View File

@ -558,10 +558,15 @@ public class SLSRunner extends Configured implements Tool {
executionType = ExecutionType.valueOf(
jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
}
long allocationId = -1;
if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) {
allocationId = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
}
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type,
executionType));
executionType, allocationId));
}
}

View File

@ -272,8 +272,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
protected ResourceRequest createResourceRequest(Resource resource,
ExecutionType executionType, String host, int priority, int
numContainers) {
ExecutionType executionType, String host, int priority, long
allocationId, int numContainers) {
ResourceRequest request = recordFactory
.newRecordInstance(ResourceRequest.class);
request.setCapability(resource);
@ -284,6 +284,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
request.setAllocationRequestId(allocationId);
return request;
}
@ -406,11 +407,22 @@ public abstract class AMSimulator extends TaskRunner.Task {
protected List<ResourceRequest> packageRequests(
List<ContainerSimulator> csList, int priority) {
// create requests
Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();
Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
ResourceRequest anyRequest = null;
Map<Long, Map<String, ResourceRequest>> rackLocalRequests =
new HashMap<>();
Map<Long, Map<String, ResourceRequest>> nodeLocalRequests =
new HashMap<>();
Map<Long, ResourceRequest> anyRequests = new HashMap<>();
for (ContainerSimulator cs : csList) {
long allocationId = cs.getAllocationId();
ResourceRequest anyRequest = anyRequests.get(allocationId);
if (cs.getHostname() != null) {
Map<String, ResourceRequest> rackLocalRequestMap;
if (rackLocalRequests.containsKey(allocationId)) {
rackLocalRequestMap = rackLocalRequests.get(allocationId);
} else {
rackLocalRequestMap = new HashMap<>();
rackLocalRequests.put(allocationId, rackLocalRequestMap);
}
String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = "/" + rackHostNames[0];
@ -419,34 +431,49 @@ public abstract class AMSimulator extends TaskRunner.Task {
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), rackname, priority, 1);
cs.getExecutionType(), rackname, priority,
cs.getAllocationId(), 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
Map<String, ResourceRequest> nodeLocalRequestMap;
if (nodeLocalRequests.containsKey(allocationId)) {
nodeLocalRequestMap = nodeLocalRequests.get(allocationId);
} else {
nodeLocalRequestMap = new HashMap<>();
nodeLocalRequests.put(allocationId, nodeLocalRequestMap);
}
String hostname = rackHostNames[1];
if (nodeLocalRequestMap.containsKey(hostname)) {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), hostname, priority, 1);
cs.getExecutionType(), hostname, priority,
cs.getAllocationId(), 1);
nodeLocalRequestMap.put(hostname, request);
}
}
// any
if (anyRequest == null) {
anyRequest = createResourceRequest(cs.getResource(),
cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
cs.getExecutionType(), ResourceRequest.ANY, priority,
cs.getAllocationId(), 1);
anyRequests.put(allocationId, anyRequest);
} else {
anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
}
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.addAll(nodeLocalRequestMap.values());
ask.addAll(rackLocalRequestMap.values());
if (anyRequest != null) {
ask.add(anyRequest);
for (Map<String, ResourceRequest> nodeLocalRequestMap :
nodeLocalRequests.values()) {
ask.addAll(nodeLocalRequestMap.values());
}
for (Map<String, ResourceRequest> rackLocalRequestMap :
rackLocalRequests.values()) {
ask.addAll(rackLocalRequestMap.values());
}
ask.addAll(anyRequests.values());
return ask;
}

View File

@ -123,5 +123,6 @@ public class SLSConfiguration {
public static final String TASK_TYPE = TASK_CONTAINER + "type";
public static final String TASK_EXECUTION_TYPE = TASK_CONTAINER
+ "execution.type";
public static final String TASK_ALLOCATION_ID = TASK_CONTAINER
+ "allocation.id";
}

View File

@ -257,7 +257,7 @@ public class NMSimulator extends TaskRunner.Task {
// normal container
ContainerSimulator cs = new ContainerSimulator(container.getId(),
container.getResource(), lifeTimeMS + System.currentTimeMillis(),
lifeTimeMS);
lifeTimeMS, container.getAllocationRequestId());
containerQueue.add(cs);
runningContainers.put(cs.getId(), cs);
} else {

View File

@ -46,6 +46,8 @@ public class ContainerSimulator implements Delayed {
private String type;
// execution type
private ExecutionType executionType = ExecutionType.GUARANTEED;
// allocation id
private long allocationId;
/**
* invoked when AM schedules containers to allocate.
@ -61,23 +63,34 @@ public class ContainerSimulator implements Delayed {
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType) {
this(resource, lifeTime, hostname, priority, type, executionType, -1);
}
/**
* invoked when AM schedules containers to allocate.
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType,
long allocationId) {
this.resource = resource;
this.lifeTime = lifeTime;
this.hostname = hostname;
this.priority = priority;
this.type = type;
this.executionType = executionType;
this.allocationId = allocationId;
}
/**
* invoke when NM schedules containers to run.
*/
public ContainerSimulator(ContainerId id, Resource resource, long endTime,
long lifeTime) {
long lifeTime, long allocationId) {
this.id = id;
this.resource = resource;
this.endTime = endTime;
this.lifeTime = lifeTime;
this.allocationId = allocationId;
}
public Resource getResource() {
@ -131,4 +144,8 @@ public class ContainerSimulator implements Delayed {
public ExecutionType getExecutionType() {
return executionType;
}
public long getAllocationId() {
return allocationId;
}
}

View File

@ -21,7 +21,10 @@ import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -189,6 +193,102 @@ public class TestAMSimulator {
}
}
@Test
public void testPackageRequests() {
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containerSimulators = new ArrayList<>();
Resource resource = Resources.createResource(1024);
int priority = 1;
ExecutionType execType = ExecutionType.GUARANTEED;
String type = "map";
ContainerSimulator s1 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType);
ContainerSimulator s2 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType);
ContainerSimulator s3 = new ContainerSimulator(resource, 100,
"/default-rack/h2", priority, type, execType);
containerSimulators.add(s1);
containerSimulators.add(s2);
containerSimulators.add(s3);
List<ResourceRequest> res = app.packageRequests(containerSimulators,
priority);
// total 4 resource requests: any -> 1, rack -> 1, node -> 2
// All resource requests for any would be packaged into 1.
// All resource requests for racks would be packaged into 1 as all of them
// are for same rack.
// All resource requests for nodes would be packaged into 2 as there are
// two different nodes.
Assert.assertEquals(4, res.size());
int anyRequestCount = 0;
int rackRequestCount = 0;
int nodeRequestCount = 0;
for (ResourceRequest request : res) {
String resourceName = request.getResourceName();
if (resourceName.equals("*")) {
anyRequestCount++;
} else if (resourceName.equals("/default-rack")) {
rackRequestCount++;
} else {
nodeRequestCount++;
}
}
Assert.assertEquals(1, anyRequestCount);
Assert.assertEquals(1, rackRequestCount);
Assert.assertEquals(2, nodeRequestCount);
containerSimulators.clear();
s1 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType, 1);
s2 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType, 2);
s3 = new ContainerSimulator(resource, 100,
"/default-rack/h2", priority, type, execType, 1);
containerSimulators.add(s1);
containerSimulators.add(s2);
containerSimulators.add(s3);
res = app.packageRequests(containerSimulators, priority);
// total 7 resource requests: any -> 2, rack -> 2, node -> 3
// All resource requests for any would be packaged into 2 as there are
// two different allocation id.
// All resource requests for racks would be packaged into 2 as all of them
// are for same rack but for two different allocation id.
// All resource requests for nodes would be packaged into 3 as either node
// or allocation id is different for each request.
Assert.assertEquals(7, res.size());
anyRequestCount = 0;
rackRequestCount = 0;
nodeRequestCount = 0;
for (ResourceRequest request : res) {
String resourceName = request.getResourceName();
long allocationId = request.getAllocationRequestId();
// allocation id should be either 1 or 2
Assert.assertTrue(allocationId == 1 || allocationId == 2);
if (resourceName.equals("*")) {
anyRequestCount++;
} else if (resourceName.equals("/default-rack")) {
rackRequestCount++;
} else {
nodeRequestCount++;
}
}
Assert.assertEquals(2, anyRequestCount);
Assert.assertEquals(2, rackRequestCount);
Assert.assertEquals(3, nodeRequestCount);
}
@After
public void tearDown() {
if (rm != null) {

View File

@ -12,7 +12,8 @@
"container.end.ms": 23707,
"container.priority": 20,
"container.type": "map",
"container.execution.type": "GUARANTEED"
"container.execution.type": "GUARANTEED",
"container.allocation.id": 1
},
{
"container.host": "/default-rack/node3",
@ -20,7 +21,8 @@
"container.end.ms": 21593,
"container.priority": 20,
"container.type": "map",
"container.execution.type": "GUARANTEED"
"container.execution.type": "GUARANTEED",
"container.allocation.id": 2
},
{
"container.host": "/default-rack/node2",
@ -28,7 +30,8 @@
"container.end.ms": 86613,
"container.priority": 20,
"container.type": "map",
"container.execution.type": "GUARANTEED"
"container.execution.type": "GUARANTEED",
"container.allocation.id": 2
}
]
}