YARN-8412. Move ResourceRequest.clone logic everywhere into a proper API. Contributed by Botong Huang.
This commit is contained in:
parent
59de967954
commit
99948565cb
|
@ -102,6 +102,26 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clone a ResourceRequest object (shallow copy). Please keep it loaded with
|
||||||
|
* all (new) fields
|
||||||
|
*
|
||||||
|
* @param rr the object to copy from
|
||||||
|
* @return the copied object
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public static ResourceRequest clone(ResourceRequest rr) {
|
||||||
|
// Please keep it loaded with all (new) fields
|
||||||
|
return ResourceRequest.newBuilder().priority(rr.getPriority())
|
||||||
|
.resourceName(rr.getResourceName()).capability(rr.getCapability())
|
||||||
|
.numContainers(rr.getNumContainers())
|
||||||
|
.relaxLocality(rr.getRelaxLocality())
|
||||||
|
.nodeLabelExpression(rr.getNodeLabelExpression())
|
||||||
|
.executionTypeRequest(rr.getExecutionTypeRequest())
|
||||||
|
.allocationRequestId(rr.getAllocationRequestId()).build();
|
||||||
|
}
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public static ResourceRequestBuilder newBuilder() {
|
public static ResourceRequestBuilder newBuilder() {
|
||||||
|
|
|
@ -451,16 +451,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
for(ResourceRequest r : ask) {
|
for(ResourceRequest r : ask) {
|
||||||
// create a copy of ResourceRequest as we might change it while the
|
// create a copy of ResourceRequest as we might change it while the
|
||||||
// RPC layer is using it to send info across
|
// RPC layer is using it to send info across
|
||||||
ResourceRequest rr =
|
askList.add(ResourceRequest.clone(r));
|
||||||
ResourceRequest.newBuilder().priority(r.getPriority())
|
|
||||||
.resourceName(r.getResourceName()).capability(r.getCapability())
|
|
||||||
.numContainers(r.getNumContainers())
|
|
||||||
.relaxLocality(r.getRelaxLocality())
|
|
||||||
.nodeLabelExpression(r.getNodeLabelExpression())
|
|
||||||
.executionTypeRequest(r.getExecutionTypeRequest())
|
|
||||||
.allocationRequestId(r.getAllocationRequestId())
|
|
||||||
.build();
|
|
||||||
askList.add(rr);
|
|
||||||
}
|
}
|
||||||
return askList;
|
return askList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -570,11 +570,7 @@ public class TestAMRMClientOnRMRestart {
|
||||||
ContainerUpdates updateRequests) {
|
ContainerUpdates updateRequests) {
|
||||||
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
|
||||||
for (ResourceRequest req : ask) {
|
for (ResourceRequest req : ask) {
|
||||||
ResourceRequest reqCopy =
|
askCopy.add(ResourceRequest.clone(req));
|
||||||
ResourceRequest.newInstance(req.getPriority(),
|
|
||||||
req.getResourceName(), req.getCapability(),
|
|
||||||
req.getNumContainers(), req.getRelaxLocality());
|
|
||||||
askCopy.add(reqCopy);
|
|
||||||
}
|
}
|
||||||
lastAsk = ask;
|
lastAsk = ask;
|
||||||
lastRelease = release;
|
lastRelease = release;
|
||||||
|
|
|
@ -220,13 +220,7 @@ public class AMRMClientRelayer extends AbstractService
|
||||||
for (ResourceRequest r : ask) {
|
for (ResourceRequest r : ask) {
|
||||||
// create a copy of ResourceRequest as we might change it while the
|
// create a copy of ResourceRequest as we might change it while the
|
||||||
// RPC layer is using it to send info across
|
// RPC layer is using it to send info across
|
||||||
askList.add(ResourceRequest.newBuilder().priority(r.getPriority())
|
askList.add(ResourceRequest.clone(r));
|
||||||
.resourceName(r.getResourceName()).capability(r.getCapability())
|
|
||||||
.numContainers(r.getNumContainers())
|
|
||||||
.relaxLocality(r.getRelaxLocality())
|
|
||||||
.nodeLabelExpression(r.getNodeLabelExpression())
|
|
||||||
.executionTypeRequest(r.getExecutionTypeRequest())
|
|
||||||
.allocationRequestId(r.getAllocationRequestId()).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
allocateRequest = AllocateRequest.newBuilder()
|
allocateRequest = AllocateRequest.newBuilder()
|
||||||
|
|
|
@ -361,15 +361,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
||||||
for (SubClusterId targetId : targetSCs) {
|
for (SubClusterId targetId : targetSCs) {
|
||||||
// if the calculated request is non-empty add it to the answer
|
// if the calculated request is non-empty add it to the answer
|
||||||
if (containerNums.get(i) > 0) {
|
if (containerNums.get(i) > 0) {
|
||||||
ResourceRequest out =
|
ResourceRequest out = ResourceRequest.clone(originalResourceRequest);
|
||||||
ResourceRequest.newInstance(originalResourceRequest.getPriority(),
|
|
||||||
originalResourceRequest.getResourceName(),
|
|
||||||
originalResourceRequest.getCapability(),
|
|
||||||
originalResourceRequest.getNumContainers(),
|
|
||||||
originalResourceRequest.getRelaxLocality(),
|
|
||||||
originalResourceRequest.getNodeLabelExpression(),
|
|
||||||
originalResourceRequest.getExecutionTypeRequest());
|
|
||||||
out.setAllocationRequestId(allocationId);
|
|
||||||
out.setNumContainers(containerNums.get(i));
|
out.setNumContainers(containerNums.get(i));
|
||||||
if (ResourceRequest.isAnyLocation(out.getResourceName())) {
|
if (ResourceRequest.isAnyLocation(out.getResourceName())) {
|
||||||
allocationBookkeeper.addAnyRR(targetId, out);
|
allocationBookkeeper.addAnyRR(targetId, out);
|
||||||
|
|
|
@ -165,7 +165,7 @@ public class ResourceRequestSet {
|
||||||
// the same numContainers value
|
// the same numContainers value
|
||||||
Map<String, ResourceRequest> newAsks = new HashMap<>();
|
Map<String, ResourceRequest> newAsks = new HashMap<>();
|
||||||
for (ResourceRequest rr : this.asks.values()) {
|
for (ResourceRequest rr : this.asks.values()) {
|
||||||
ResourceRequest clone = cloneResourceRequest(rr);
|
ResourceRequest clone = ResourceRequest.clone(rr);
|
||||||
clone.setNumContainers(newValue);
|
clone.setNumContainers(newValue);
|
||||||
newAsks.put(clone.getResourceName(), clone);
|
newAsks.put(clone.getResourceName(), clone);
|
||||||
}
|
}
|
||||||
|
@ -176,22 +176,12 @@ public class ResourceRequestSet {
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"No ANY RR found in requestSet with numContainers=" + oldValue);
|
"No ANY RR found in requestSet with numContainers=" + oldValue);
|
||||||
}
|
}
|
||||||
ResourceRequest clone = cloneResourceRequest(rr);
|
ResourceRequest clone = ResourceRequest.clone(rr);
|
||||||
clone.setNumContainers(newValue);
|
clone.setNumContainers(newValue);
|
||||||
this.asks.put(ResourceRequest.ANY, clone);
|
this.asks.put(ResourceRequest.ANY, clone);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResourceRequest cloneResourceRequest(ResourceRequest rr) {
|
|
||||||
return ResourceRequest.newBuilder().priority(rr.getPriority())
|
|
||||||
.resourceName(rr.getResourceName()).capability(rr.getCapability())
|
|
||||||
.numContainers(rr.getNumContainers())
|
|
||||||
.relaxLocality(rr.getRelaxLocality())
|
|
||||||
.nodeLabelExpression(rr.getNodeLabelExpression())
|
|
||||||
.executionTypeRequest(rr.getExecutionTypeRequest())
|
|
||||||
.allocationRequestId(rr.getAllocationRequestId()).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
|
|
@ -375,18 +375,6 @@ public class BuilderUtils {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceRequest newResourceRequest(ResourceRequest r) {
|
|
||||||
ResourceRequest request = recordFactory
|
|
||||||
.newRecordInstance(ResourceRequest.class);
|
|
||||||
request.setPriority(r.getPriority());
|
|
||||||
request.setResourceName(r.getResourceName());
|
|
||||||
request.setCapability(r.getCapability());
|
|
||||||
request.setNumContainers(r.getNumContainers());
|
|
||||||
request.setNodeLabelExpression(r.getNodeLabelExpression());
|
|
||||||
request.setExecutionTypeRequest(r.getExecutionTypeRequest());
|
|
||||||
return request;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ApplicationReport newApplicationReport(
|
public static ApplicationReport newApplicationReport(
|
||||||
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
|
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
|
||||||
String user, String queue, String name, String host, int rpcPort,
|
String user, String queue, String name, String host, int rpcPort,
|
||||||
|
|
|
@ -251,14 +251,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
||||||
ResourceRequest newRequest = ResourceRequest.newBuilder()
|
ResourceRequest newRequest = ResourceRequest.clone(request);
|
||||||
.priority(request.getPriority())
|
newRequest.setNumContainers(1);
|
||||||
.allocationRequestId(request.getAllocationRequestId())
|
|
||||||
.resourceName(request.getResourceName())
|
|
||||||
.capability(request.getCapability())
|
|
||||||
.numContainers(1)
|
|
||||||
.relaxLocality(request.getRelaxLocality())
|
|
||||||
.nodeLabelExpression(request.getNodeLabelExpression()).build();
|
|
||||||
return newRequest;
|
return newRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -305,9 +305,8 @@ public class Application {
|
||||||
|
|
||||||
// Note this down for next interaction with ResourceManager
|
// Note this down for next interaction with ResourceManager
|
||||||
ask.remove(request);
|
ask.remove(request);
|
||||||
ask.add(
|
// clone to ensure the RM doesn't manipulate the same obj
|
||||||
org.apache.hadoop.yarn.server.utils.BuilderUtils.newResourceRequest(
|
ask.add(ResourceRequest.clone(request));
|
||||||
request)); // clone to ensure the RM doesn't manipulate the same obj
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("addResourceRequest: applicationId=" + applicationId.getId()
|
LOG.debug("addResourceRequest: applicationId=" + applicationId.getId()
|
||||||
|
@ -462,9 +461,8 @@ public class Application {
|
||||||
|
|
||||||
// Note this for next interaction with ResourceManager
|
// Note this for next interaction with ResourceManager
|
||||||
ask.remove(request);
|
ask.remove(request);
|
||||||
ask.add(
|
// clone to ensure the RM doesn't manipulate the same obj
|
||||||
org.apache.hadoop.yarn.server.utils.BuilderUtils.newResourceRequest(
|
ask.add(ResourceRequest.clone(request));
|
||||||
request)); // clone to ensure the RM doesn't manipulate the same obj
|
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("updateResourceRequest:" + " application=" + applicationId
|
LOG.debug("updateResourceRequest:" + " application=" + applicationId
|
||||||
|
|
|
@ -733,7 +733,7 @@ public class TestAppManager{
|
||||||
ResourceRequest.newInstance(Priority.newInstance(0),
|
ResourceRequest.newInstance(Priority.newInstance(0),
|
||||||
ResourceRequest.ANY, Resources.createResource(1025), 1, true);
|
ResourceRequest.ANY, Resources.createResource(1025), 1, true);
|
||||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||||
asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
|
asContext.setAMContainerResourceRequest(ResourceRequest.clone(req));
|
||||||
// getAMContainerResourceRequests uses a singleton list of
|
// getAMContainerResourceRequests uses a singleton list of
|
||||||
// getAMContainerResourceRequest
|
// getAMContainerResourceRequest
|
||||||
Assert.assertEquals(req, asContext.getAMContainerResourceRequest());
|
Assert.assertEquals(req, asContext.getAMContainerResourceRequest());
|
||||||
|
@ -1099,25 +1099,11 @@ public class TestAppManager{
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ResourceRequest cloneResourceRequest(ResourceRequest req) {
|
|
||||||
return ResourceRequest.newInstance(
|
|
||||||
Priority.newInstance(req.getPriority().getPriority()),
|
|
||||||
new String(req.getResourceName()),
|
|
||||||
Resource.newInstance(req.getCapability().getMemorySize(),
|
|
||||||
req.getCapability().getVirtualCores()),
|
|
||||||
req.getNumContainers(),
|
|
||||||
req.getRelaxLocality(),
|
|
||||||
req.getNodeLabelExpression() != null
|
|
||||||
? new String(req.getNodeLabelExpression()) : null,
|
|
||||||
ExecutionTypeRequest.newInstance(
|
|
||||||
req.getExecutionTypeRequest().getExecutionType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<ResourceRequest> cloneResourceRequests(
|
private static List<ResourceRequest> cloneResourceRequests(
|
||||||
List<ResourceRequest> reqs) {
|
List<ResourceRequest> reqs) {
|
||||||
List<ResourceRequest> cloneReqs = new ArrayList<>();
|
List<ResourceRequest> cloneReqs = new ArrayList<>();
|
||||||
for (ResourceRequest req : reqs) {
|
for (ResourceRequest req : reqs) {
|
||||||
cloneReqs.add(cloneResourceRequest(req));
|
cloneReqs.add(ResourceRequest.clone(req));
|
||||||
}
|
}
|
||||||
return cloneReqs;
|
return cloneReqs;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue