YARN-5351. ResourceRequest should take ExecutionType into account during comparison. (Konstantinos Karanasos via asuresh)

(cherry picked from commit 2d8d183b19)
This commit is contained in:
Arun Suresh 2016-07-26 19:08:30 -07:00
parent df2ed6b2c4
commit 0f6ff20652
4 changed files with 159 additions and 9 deletions

View File

@ -31,7 +31,8 @@ import org.apache.hadoop.yarn.util.Records;
*/
@Public
@Evolving
public abstract class ExecutionTypeRequest {
public abstract class ExecutionTypeRequest
implements Comparable<ExecutionTypeRequest> {
@Public
@Evolving
@ -39,6 +40,12 @@ public abstract class ExecutionTypeRequest {
return newInstance(ExecutionType.GUARANTEED, false);
}
@Public
@Evolving
public static ExecutionTypeRequest newInstance(ExecutionType execType) {
return newInstance(execType, false);
}
@Public
@Evolving
public static ExecutionTypeRequest newInstance(ExecutionType execType,

View File

@ -116,6 +116,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
String h2 = r2.getResourceName();
ret = h1.compareTo(h2);
}
if (ret == 0) {
ret = r1.getExecutionTypeRequest()
.compareTo(r2.getExecutionTypeRequest());
}
if (ret == 0) {
ret = r1.getCapability().compareTo(r2.getCapability());
}
@ -414,7 +418,8 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
if (other.getExecutionTypeRequest() != null) {
return false;
}
} else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) {
} else if (!execTypeRequest.getExecutionType()
.equals(other.getExecutionTypeRequest().getExecutionType())) {
return false;
}
if (getNodeLabelExpression() == null) {
@ -441,12 +446,18 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
int hostNameComparison =
this.getResourceName().compareTo(other.getResourceName());
if (hostNameComparison == 0) {
int capabilityComparison =
this.getCapability().compareTo(other.getCapability());
if (capabilityComparison == 0) {
return this.getNumContainers() - other.getNumContainers();
int execTypeReqComparison = this.getExecutionTypeRequest()
.compareTo(other.getExecutionTypeRequest());
if (execTypeReqComparison == 0) {
int capabilityComparison =
this.getCapability().compareTo(other.getCapability());
if (capabilityComparison == 0) {
return this.getNumContainers() - other.getNumContainers();
} else {
return capabilityComparison;
}
} else {
return capabilityComparison;
return execTypeReqComparison;
}
} else {
return hostNameComparison;

View File

@ -39,7 +39,6 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -120,7 +120,7 @@ public class TestAMRMClient {
static String[] nodes;
static String[] racks;
private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
// start minicluster
@ -335,6 +335,133 @@ public class TestAMRMClient {
}
}
}
/**
* Test fit of both GUARANTEED and OPPORTUNISTIC containers.
*/
@Test (timeout=60000)
public void testAMRMClientMatchingFitExecType()
throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null;
try {
// start am rm client
amClient = AMRMClient.<ContainerRequest>createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
Resource capability1 = Resource.newInstance(1024, 2);
Resource capability2 = Resource.newInstance(1024, 1);
Resource capability3 = Resource.newInstance(1000, 2);
Resource capability4 = Resource.newInstance(1000, 2);
Resource capability5 = Resource.newInstance(2000, 2);
Resource capability6 = Resource.newInstance(2000, 3);
Resource capability7 = Resource.newInstance(6000, 3);
// Add 2 GUARANTEED and 7 OPPORTUNISTIC requests.
ContainerRequest storedGuarContainer1 =
new ContainerRequest(capability1, nodes, racks, priority);
ContainerRequest storedGuarContainer2 =
new ContainerRequest(capability2, nodes, racks, priority);
ContainerRequest storedOpportContainer1 =
new ContainerRequest(capability1, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer2 =
new ContainerRequest(capability2, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer3 =
new ContainerRequest(capability3, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer4 =
new ContainerRequest(capability4, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer5 =
new ContainerRequest(capability5, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer6 =
new ContainerRequest(capability6, nodes, racks, priority, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer7 =
new ContainerRequest(capability7, nodes, racks, priority2,
false, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
amClient.addContainerRequest(storedGuarContainer1);
amClient.addContainerRequest(storedGuarContainer2);
amClient.addContainerRequest(storedOpportContainer1);
amClient.addContainerRequest(storedOpportContainer2);
amClient.addContainerRequest(storedOpportContainer3);
amClient.addContainerRequest(storedOpportContainer4);
amClient.addContainerRequest(storedOpportContainer5);
amClient.addContainerRequest(storedOpportContainer6);
amClient.addContainerRequest(storedOpportContainer7);
// Make sure 3 entries are generated in the ask list for each added
// container request of a given capability, locality, execution type and
// priority (one node-local, one rack-local, and one ANY).
assertEquals(24,
(((AMRMClientImpl<ContainerRequest>) amClient).ask.size()));
// test exact matching of GUARANTEED containers
List<? extends Collection<ContainerRequest>> matches;
ContainerRequest storedRequest;
Resource testCapability1 = Resource.newInstance(1024, 2);
matches = amClient
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertEquals(storedGuarContainer1, storedRequest);
amClient.removeContainerRequest(storedGuarContainer1);
// test exact matching of OPPORTUNISTIC containers
matches = amClient.getMatchingRequests(priority, node,
ExecutionType.OPPORTUNISTIC, testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertEquals(storedOpportContainer1, storedRequest);
amClient.removeContainerRequest(storedOpportContainer1);
// exact OPPORTUNISTIC matching with order maintained
Resource testCapability2 = Resource.newInstance(1000, 2);
matches = amClient.getMatchingRequests(priority, node,
ExecutionType.OPPORTUNISTIC, testCapability2);
verifyMatches(matches, 2);
// must be returned in the order they were made
int i = 0;
for(ContainerRequest storedRequest1 : matches.get(0)) {
if(i++ == 0) {
assertEquals(storedOpportContainer3, storedRequest1);
} else {
assertEquals(storedOpportContainer4, storedRequest1);
}
}
amClient.removeContainerRequest(storedOpportContainer3);
// matching with larger container
Resource testCapability3 = Resource.newInstance(4000, 4);
matches = amClient.getMatchingRequests(priority, node,
ExecutionType.OPPORTUNISTIC, testCapability3);
assert(matches.size() == 4);
// verify requests without relaxed locality are only returned at specific
// locations
Resource testCapability4 = Resource.newInstance(6000, 3);
matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, testCapability4);
assert(matches.size() == 0);
matches = amClient.getMatchingRequests(priority2, node,
ExecutionType.OPPORTUNISTIC, testCapability4);
assert(matches.size() == 1);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
private void verifyMatches(
List<? extends Collection<ContainerRequest>> matches,

View File

@ -85,6 +85,11 @@ public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest {
return p.getEnforceExecutionType();
}
@Override
public int compareTo(ExecutionTypeRequest other) {
return this.getExecutionType().compareTo(other.getExecutionType());
}
@Override
public String toString() {
return "{Execution Type: " + getExecutionType()