YARN-6619. AMRMClient Changes to use the PlacementConstraint and SchcedulingRequest objects. (Arun Suresh via wangda)
This commit is contained in:
parent
a5c1fc881e
commit
29d9e4d581
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -39,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -553,6 +557,18 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a Collection of SchedulingRequests. The AMRMClient will ensure that
|
||||||
|
* all requests in the same batch are sent in the same allocate call.
|
||||||
|
* @param schedulingRequests Collection of Scheduling Requests.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public void addSchedulingRequests(
|
||||||
|
Collection<SchedulingRequest> schedulingRequests) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the application master. This must be called before any
|
* Register the application master. This must be called before any
|
||||||
* other interaction
|
* other interaction
|
||||||
|
@ -568,7 +584,27 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
int appHostPort,
|
int appHostPort,
|
||||||
String appTrackingUrl)
|
String appTrackingUrl)
|
||||||
throws YarnException, IOException;
|
throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the application master. This must be called before any
|
||||||
|
* other interaction
|
||||||
|
* @param appHostName Name of the host on which master is running
|
||||||
|
* @param appHostPort Port master is listening on
|
||||||
|
* @param appTrackingUrl URL at which the master info can be seen
|
||||||
|
* @param placementConstraints Placement Constraints mappings.
|
||||||
|
* @return <code>RegisterApplicationMasterResponse</code>
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
String appHostName, int appHostPort, String appTrackingUrl,
|
||||||
|
Map<Set<String>, PlacementConstraint> placementConstraints)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
throw new YarnException("Not supported");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Request additional containers and receive new container allocations.
|
* Request additional containers and receive new container allocations.
|
||||||
* Requests made via <code>addContainerRequest</code> are sent to the
|
* Requests made via <code>addContainerRequest</code> are sent to the
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.api.async;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
@ -38,9 +40,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
|
@ -205,6 +210,19 @@ extends AbstractService {
|
||||||
String resourceName,
|
String resourceName,
|
||||||
Resource capability);
|
Resource capability);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a Collection of SchedulingRequests. The AMRMClient will ensure that
|
||||||
|
* all requests in the same batch are sent in the same allocate call.
|
||||||
|
* @param schedulingRequests Collection of Scheduling Requests.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void addSchedulingRequests(
|
||||||
|
Collection<SchedulingRequest> schedulingRequests) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all matching ContainerRequests that match the given Priority,
|
* Returns all matching ContainerRequests that match the given Priority,
|
||||||
* ResourceName, ExecutionType and Capability.
|
* ResourceName, ExecutionType and Capability.
|
||||||
|
@ -249,6 +267,26 @@ extends AbstractService {
|
||||||
String appHostName, int appHostPort, String appTrackingUrl)
|
String appHostName, int appHostPort, String appTrackingUrl)
|
||||||
throws YarnException, IOException;
|
throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the application master. This must be called before any
|
||||||
|
* other interaction
|
||||||
|
* @param appHostName Name of the host on which master is running
|
||||||
|
* @param appHostPort Port master is listening on
|
||||||
|
* @param appTrackingUrl URL at which the master info can be seen
|
||||||
|
* @param placementConstraints Placement Constraints mappings.
|
||||||
|
* @return <code>RegisterApplicationMasterResponse</code>
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
String appHostName, int appHostPort, String appTrackingUrl,
|
||||||
|
Map<Set<String>, PlacementConstraint> placementConstraints)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
throw new YarnException("Not supported");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregister the application master. This must be called in the end.
|
* Unregister the application master. This must be called in the end.
|
||||||
* @param appStatus Success/Failure status of the master
|
* @param appStatus Success/Failure status of the master
|
||||||
|
@ -494,6 +532,16 @@ extends AbstractService {
|
||||||
public void onContainersReceivedFromPreviousAttempts(
|
public void onContainersReceivedFromPreviousAttempts(
|
||||||
List<Container> containers) {
|
List<Container> containers) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the RM has rejected Scheduling Requests.
|
||||||
|
* @param rejectedSchedulingRequests Rejected Scheduling Requests.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void onRequestsRejected(
|
||||||
|
List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
@ -36,9 +38,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
|
@ -150,18 +155,50 @@ extends AMRMClientAsync<T> {
|
||||||
Resource capability) {
|
Resource capability) {
|
||||||
return client.getMatchingRequests(priority, resourceName, capability);
|
return client.getMatchingRequests(priority, resourceName, capability);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSchedulingRequests(
|
||||||
|
Collection<SchedulingRequest> schedulingRequests) {
|
||||||
|
client.addSchedulingRequests(schedulingRequests);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers this application master with the resource manager. On successful
|
* Registers this application master with the resource manager. On successful
|
||||||
* registration, starts the heartbeating thread.
|
* registration, starts the heartbeating thread.
|
||||||
|
*
|
||||||
|
* @param appHostName Name of the host on which master is running
|
||||||
|
* @param appHostPort Port master is listening on
|
||||||
|
* @param appTrackingUrl URL at which the master info can be seen
|
||||||
|
* @return Register AM Response.
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
String appHostName, int appHostPort, String appTrackingUrl)
|
String appHostName, int appHostPort, String appTrackingUrl)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
return registerApplicationMaster(
|
||||||
|
appHostName, appHostPort, appTrackingUrl, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers this application master with the resource manager. On successful
|
||||||
|
* registration, starts the heartbeating thread.
|
||||||
|
*
|
||||||
|
* @param appHostName Name of the host on which master is running
|
||||||
|
* @param appHostPort Port master is listening on
|
||||||
|
* @param appTrackingUrl URL at which the master info can be seen
|
||||||
|
* @param placementConstraintsMap Placement Constraints Mapping.
|
||||||
|
* @return Register AM Response.
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
String appHostName, int appHostPort, String appTrackingUrl,
|
||||||
|
Map<Set<String>, PlacementConstraint> placementConstraintsMap)
|
||||||
|
throws YarnException, IOException {
|
||||||
RegisterApplicationMasterResponse response = client
|
RegisterApplicationMasterResponse response = client
|
||||||
.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
|
.registerApplicationMaster(appHostName, appHostPort,
|
||||||
|
appTrackingUrl, placementConstraintsMap);
|
||||||
heartbeatThread.start();
|
heartbeatThread.start();
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -366,6 +403,14 @@ extends AMRMClientAsync<T> {
|
||||||
response.getContainersFromPreviousAttempts());
|
response.getContainersFromPreviousAttempts());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
List<RejectedSchedulingRequest> rejectedSchedulingRequests =
|
||||||
|
response.getRejectedSchedulingRequests();
|
||||||
|
if (!rejectedSchedulingRequests.isEmpty()) {
|
||||||
|
if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
|
||||||
|
((AMRMClientAsync.AbstractCallbackHandler) handler)
|
||||||
|
.onRequestsRejected(rejectedSchedulingRequests);
|
||||||
|
}
|
||||||
|
}
|
||||||
progress = handler.getProgress();
|
progress = handler.getProgress();
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
handler.onError(ex);
|
handler.onError(ex);
|
||||||
|
|
|
@ -30,9 +30,11 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.AbstractMap.SimpleEntry;
|
import java.util.AbstractMap.SimpleEntry;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
@ -60,9 +62,11 @@ import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
|
@ -106,6 +110,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
protected final Set<String> blacklistedNodes = new HashSet<String>();
|
protected final Set<String> blacklistedNodes = new HashSet<String>();
|
||||||
protected final Set<String> blacklistAdditions = new HashSet<String>();
|
protected final Set<String> blacklistAdditions = new HashSet<String>();
|
||||||
protected final Set<String> blacklistRemovals = new HashSet<String>();
|
protected final Set<String> blacklistRemovals = new HashSet<String>();
|
||||||
|
private Map<Set<String>, PlacementConstraint> placementConstraints =
|
||||||
|
new HashMap<>();
|
||||||
|
private Queue<Collection<SchedulingRequest>> batchedSchedulingRequests =
|
||||||
|
new LinkedList<>();
|
||||||
|
private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
protected Map<String, Resource> resourceProfilesMap;
|
protected Map<String, Resource> resourceProfilesMap;
|
||||||
|
|
||||||
|
@ -218,14 +228,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
}
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
String appHostName, int appHostPort, String appTrackingUrl)
|
String appHostName, int appHostPort, String appTrackingUrl)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
return registerApplicationMaster(appHostName, appHostPort, appTrackingUrl,
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
String appHostName, int appHostPort, String appTrackingUrl,
|
||||||
|
Map<Set<String>, PlacementConstraint> placementConstraintsMap)
|
||||||
|
throws YarnException, IOException {
|
||||||
this.appHostName = appHostName;
|
this.appHostName = appHostName;
|
||||||
this.appHostPort = appHostPort;
|
this.appHostPort = appHostPort;
|
||||||
this.appTrackingUrl = appTrackingUrl;
|
this.appTrackingUrl = appTrackingUrl;
|
||||||
|
if (placementConstraintsMap != null && !placementConstraintsMap.isEmpty()) {
|
||||||
|
this.placementConstraints.putAll(placementConstraintsMap);
|
||||||
|
}
|
||||||
Preconditions.checkArgument(appHostName != null,
|
Preconditions.checkArgument(appHostName != null,
|
||||||
"The host name should not be null");
|
"The host name should not be null");
|
||||||
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
|
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
|
||||||
|
@ -240,6 +262,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
RegisterApplicationMasterRequest request =
|
RegisterApplicationMasterRequest request =
|
||||||
RegisterApplicationMasterRequest.newInstance(this.appHostName,
|
RegisterApplicationMasterRequest.newInstance(this.appHostName,
|
||||||
this.appHostPort, this.appTrackingUrl);
|
this.appHostPort, this.appTrackingUrl);
|
||||||
|
if (!this.placementConstraints.isEmpty()) {
|
||||||
|
request.setPlacementConstraints(this.placementConstraints);
|
||||||
|
}
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response =
|
||||||
rmClient.registerApplicationMaster(request);
|
rmClient.registerApplicationMaster(request);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -248,10 +273,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
populateNMTokens(response.getNMTokensFromPreviousAttempts());
|
populateNMTokens(response.getNMTokensFromPreviousAttempts());
|
||||||
}
|
}
|
||||||
this.resourceProfilesMap = response.getResourceProfiles();
|
this.resourceProfilesMap = response.getResourceProfiles();
|
||||||
|
List<Container> prevContainers =
|
||||||
|
response.getContainersFromPreviousAttempts();
|
||||||
|
removeFromOutstandingSchedulingRequests(prevContainers);
|
||||||
|
recreateSchedulingRequestBatch();
|
||||||
}
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSchedulingRequests(
|
||||||
|
Collection<SchedulingRequest> schedulingRequests) {
|
||||||
|
synchronized (this.batchedSchedulingRequests) {
|
||||||
|
this.batchedSchedulingRequests.add(schedulingRequests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AllocateResponse allocate(float progressIndicator)
|
public AllocateResponse allocate(float progressIndicator)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
@ -288,6 +325,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
.responseId(lastResponseId).progress(progressIndicator)
|
.responseId(lastResponseId).progress(progressIndicator)
|
||||||
.askList(askList).resourceBlacklistRequest(blacklistRequest)
|
.askList(askList).resourceBlacklistRequest(blacklistRequest)
|
||||||
.releaseList(releaseList).updateRequests(updateList).build();
|
.releaseList(releaseList).updateRequests(updateList).build();
|
||||||
|
populateSchedulingRequests(allocateRequest);
|
||||||
// clear blacklistAdditions and blacklistRemovals before
|
// clear blacklistAdditions and blacklistRemovals before
|
||||||
// unsynchronized part
|
// unsynchronized part
|
||||||
blacklistAdditions.clear();
|
blacklistAdditions.clear();
|
||||||
|
@ -296,6 +334,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
allocateResponse = rmClient.allocate(allocateRequest);
|
allocateResponse = rmClient.allocate(allocateRequest);
|
||||||
|
removeFromOutstandingSchedulingRequests(
|
||||||
|
allocateResponse.getAllocatedContainers());
|
||||||
|
removeFromOutstandingSchedulingRequests(
|
||||||
|
allocateResponse.getContainersFromPreviousAttempts());
|
||||||
} catch (ApplicationMasterNotRegisteredException e) {
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
||||||
+ " hence resyncing.");
|
+ " hence resyncing.");
|
||||||
|
@ -397,6 +439,104 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void populateSchedulingRequests(AllocateRequest allocateRequest) {
|
||||||
|
synchronized (this.batchedSchedulingRequests) {
|
||||||
|
if (!this.batchedSchedulingRequests.isEmpty()) {
|
||||||
|
List<SchedulingRequest> newReqs = new LinkedList<>();
|
||||||
|
Iterator<Collection<SchedulingRequest>> iter =
|
||||||
|
this.batchedSchedulingRequests.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Collection<SchedulingRequest> requests = iter.next();
|
||||||
|
newReqs.addAll(requests);
|
||||||
|
addToOutstandingSchedulingRequests(requests);
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
allocateRequest.setSchedulingRequests(newReqs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recreateSchedulingRequestBatch() {
|
||||||
|
List<SchedulingRequest> batched = new ArrayList<>();
|
||||||
|
synchronized (this.outstandingSchedRequests) {
|
||||||
|
for (List<SchedulingRequest> schedReqs :
|
||||||
|
this.outstandingSchedRequests.values()) {
|
||||||
|
batched.addAll(schedReqs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
synchronized (this.batchedSchedulingRequests) {
|
||||||
|
this.batchedSchedulingRequests.add(batched);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addToOutstandingSchedulingRequests(
|
||||||
|
Collection<SchedulingRequest> requests) {
|
||||||
|
for (SchedulingRequest req : requests) {
|
||||||
|
List<SchedulingRequest> schedulingRequests =
|
||||||
|
this.outstandingSchedRequests.computeIfAbsent(
|
||||||
|
req.getAllocationTags(), x -> new LinkedList<>());
|
||||||
|
SchedulingRequest matchingReq = null;
|
||||||
|
synchronized (schedulingRequests) {
|
||||||
|
for (SchedulingRequest schedReq : schedulingRequests) {
|
||||||
|
if (isMatching(req, schedReq)) {
|
||||||
|
matchingReq = schedReq;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (matchingReq != null) {
|
||||||
|
matchingReq.getResourceSizing().setNumAllocations(
|
||||||
|
req.getResourceSizing().getNumAllocations());
|
||||||
|
} else {
|
||||||
|
schedulingRequests.add(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isMatching(SchedulingRequest schedReq1,
|
||||||
|
SchedulingRequest schedReq2) {
|
||||||
|
return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
|
||||||
|
schedReq1.getExecutionType().getExecutionType().equals(
|
||||||
|
schedReq1.getExecutionType().getExecutionType()) &&
|
||||||
|
schedReq1.getAllocationRequestId() ==
|
||||||
|
schedReq2.getAllocationRequestId();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFromOutstandingSchedulingRequests(
|
||||||
|
Collection<Container> containers) {
|
||||||
|
if (containers == null || containers.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Container container : containers) {
|
||||||
|
if (container.getAllocationTags() != null &&
|
||||||
|
!container.getAllocationTags().isEmpty()) {
|
||||||
|
List<SchedulingRequest> schedReqs =
|
||||||
|
this.outstandingSchedRequests.get(container.getAllocationTags());
|
||||||
|
if (schedReqs != null && !schedReqs.isEmpty()) {
|
||||||
|
synchronized (schedReqs) {
|
||||||
|
Iterator<SchedulingRequest> iter = schedReqs.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
SchedulingRequest schedReq = iter.next();
|
||||||
|
if (schedReq.getPriority().equals(container.getPriority()) &&
|
||||||
|
schedReq.getAllocationRequestId() ==
|
||||||
|
container.getAllocationRequestId()) {
|
||||||
|
int numAllocations =
|
||||||
|
schedReq.getResourceSizing().getNumAllocations();
|
||||||
|
numAllocations--;
|
||||||
|
if (numAllocations == 0) {
|
||||||
|
iter.remove();
|
||||||
|
} else {
|
||||||
|
schedReq.getResourceSizing()
|
||||||
|
.setNumAllocations(numAllocations);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private List<UpdateContainerRequest> createUpdateList() {
|
private List<UpdateContainerRequest> createUpdateList() {
|
||||||
List<UpdateContainerRequest> updateList = new ArrayList<>();
|
List<UpdateContainerRequest> updateList = new ArrayList<>();
|
||||||
for (Map.Entry<ContainerId, SimpleEntry<Container,
|
for (Map.Entry<ContainerId, SimpleEntry<Container,
|
||||||
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for testing AMRMClient.
|
||||||
|
*/
|
||||||
|
public class BaseAMRMClientTest {
|
||||||
|
|
||||||
|
protected Configuration conf = null;
|
||||||
|
protected MiniYARNCluster yarnCluster = null;
|
||||||
|
protected YarnClient yarnClient = null;
|
||||||
|
protected List<NodeReport> nodeReports = null;
|
||||||
|
protected ApplicationAttemptId attemptId = null;
|
||||||
|
|
||||||
|
protected String schedulerName = CapacityScheduler.class.getName();
|
||||||
|
protected boolean autoUpdate = false;
|
||||||
|
|
||||||
|
protected int nodeCount = 3;
|
||||||
|
protected long amExpireMs = 4000;
|
||||||
|
protected int rollingIntervalSec = 13;
|
||||||
|
|
||||||
|
|
||||||
|
protected Resource capability;
|
||||||
|
protected Priority priority;
|
||||||
|
protected Priority priority2;
|
||||||
|
protected String node;
|
||||||
|
protected String rack;
|
||||||
|
protected String[] nodes;
|
||||||
|
protected String[] racks;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
createClusterAndStartApplication(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createClusterAndStartApplication(Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
// start minicluster
|
||||||
|
this.conf = conf;
|
||||||
|
if (autoUpdate) {
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
|
||||||
|
}
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||||
|
rollingIntervalSec);
|
||||||
|
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, amExpireMs);
|
||||||
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||||
|
// set the minimum allocation so that resource decrease can go under 1024
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||||
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||||
|
conf.setInt(
|
||||||
|
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
|
||||||
|
yarnCluster = new MiniYARNCluster(
|
||||||
|
TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
|
|
||||||
|
// start rm client
|
||||||
|
yarnClient = YarnClient.createYarnClient();
|
||||||
|
yarnClient.init(conf);
|
||||||
|
yarnClient.start();
|
||||||
|
|
||||||
|
// get node info
|
||||||
|
assertTrue("All node managers did not connect to the RM within the "
|
||||||
|
+ "allotted 5-second timeout",
|
||||||
|
yarnCluster.waitForNodeManagersToConnect(5000L));
|
||||||
|
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
|
||||||
|
assertEquals("Not all node managers were reported running",
|
||||||
|
nodeCount, nodeReports.size());
|
||||||
|
|
||||||
|
priority = Priority.newInstance(1);
|
||||||
|
priority2 = Priority.newInstance(2);
|
||||||
|
capability = Resource.newInstance(1024, 1);
|
||||||
|
|
||||||
|
node = nodeReports.get(0).getNodeId().getHost();
|
||||||
|
rack = nodeReports.get(0).getRackName();
|
||||||
|
nodes = new String[]{ node };
|
||||||
|
racks = new String[]{ rack };
|
||||||
|
|
||||||
|
// submit new app
|
||||||
|
ApplicationSubmissionContext appContext =
|
||||||
|
yarnClient.createApplication().getApplicationSubmissionContext();
|
||||||
|
ApplicationId appId = appContext.getApplicationId();
|
||||||
|
// set the application name
|
||||||
|
appContext.setApplicationName("Test");
|
||||||
|
// Set the priority for the application master
|
||||||
|
Priority pri = Records.newRecord(Priority.class);
|
||||||
|
pri.setPriority(0);
|
||||||
|
appContext.setPriority(pri);
|
||||||
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
|
appContext.setQueue("default");
|
||||||
|
// Set up the container launch context for the application master
|
||||||
|
ContainerLaunchContext amContainer =
|
||||||
|
BuilderUtils.newContainerLaunchContext(
|
||||||
|
Collections.<String, LocalResource> emptyMap(),
|
||||||
|
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
||||||
|
new HashMap<String, ByteBuffer>(), null,
|
||||||
|
new HashMap<ApplicationAccessType, String>());
|
||||||
|
appContext.setAMContainerSpec(amContainer);
|
||||||
|
appContext.setResource(Resource.newInstance(1024, 1));
|
||||||
|
// Create the request to send to the applications manager
|
||||||
|
SubmitApplicationRequest appRequest = Records
|
||||||
|
.newRecord(SubmitApplicationRequest.class);
|
||||||
|
appRequest.setApplicationSubmissionContext(appContext);
|
||||||
|
// Submit the application to the applications manager
|
||||||
|
yarnClient.submitApplication(appContext);
|
||||||
|
|
||||||
|
// wait for app to start
|
||||||
|
RMAppAttempt appAttempt = null;
|
||||||
|
while (true) {
|
||||||
|
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||||
|
if (appReport.getYarnApplicationState() ==
|
||||||
|
YarnApplicationState.ACCEPTED) {
|
||||||
|
attemptId = appReport.getCurrentApplicationAttemptId();
|
||||||
|
appAttempt =
|
||||||
|
yarnCluster.getResourceManager().getRMContext().getRMApps()
|
||||||
|
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
while (true) {
|
||||||
|
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
||||||
|
// of testing.
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
|
|
||||||
|
// emulate RM setup of AMRM token in credentials by adding the token
|
||||||
|
// *before* setting the token service
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||||
|
appAttempt.getAMRMToken().setService(
|
||||||
|
ClientRMProxy.getAMRMTokenService(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws YarnException, IOException {
|
||||||
|
yarnClient.killApplication(attemptId.getApplicationId());
|
||||||
|
attemptId = null;
|
||||||
|
|
||||||
|
if (yarnClient != null &&
|
||||||
|
yarnClient.getServiceState() == Service.STATE.STARTED) {
|
||||||
|
yarnClient.stop();
|
||||||
|
}
|
||||||
|
if (yarnCluster != null &&
|
||||||
|
yarnCluster.getServiceState() == Service.STATE.STARTED) {
|
||||||
|
yarnCluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -43,7 +43,6 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -56,24 +55,18 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.*;
|
import org.apache.hadoop.yarn.api.records.*;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
|
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
|
||||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -81,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -97,26 +88,8 @@ import org.eclipse.jetty.util.log.Log;
|
||||||
* Test application master client class to resource manager.
|
* Test application master client class to resource manager.
|
||||||
*/
|
*/
|
||||||
@RunWith(value = Parameterized.class)
|
@RunWith(value = Parameterized.class)
|
||||||
public class TestAMRMClient {
|
public class TestAMRMClient extends BaseAMRMClientTest{
|
||||||
private String schedulerName = null;
|
|
||||||
private boolean autoUpdate = false;
|
|
||||||
private Configuration conf = null;
|
|
||||||
private MiniYARNCluster yarnCluster = null;
|
|
||||||
private YarnClient yarnClient = null;
|
|
||||||
private List<NodeReport> nodeReports = null;
|
|
||||||
private ApplicationAttemptId attemptId = null;
|
|
||||||
private int nodeCount = 3;
|
|
||||||
|
|
||||||
static final int rolling_interval_sec = 13;
|
|
||||||
static final long am_expire_ms = 4000;
|
|
||||||
|
|
||||||
private Resource capability;
|
|
||||||
private Priority priority;
|
|
||||||
private Priority priority2;
|
|
||||||
private String node;
|
|
||||||
private String rack;
|
|
||||||
private String[] nodes;
|
|
||||||
private String[] racks;
|
|
||||||
private final static int DEFAULT_ITERATION = 3;
|
private final static int DEFAULT_ITERATION = 3;
|
||||||
|
|
||||||
public TestAMRMClient(String schedulerName, boolean autoUpdate) {
|
public TestAMRMClient(String schedulerName, boolean autoUpdate) {
|
||||||
|
@ -134,127 +107,6 @@ public class TestAMRMClient {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() throws Exception {
|
|
||||||
conf = new YarnConfiguration();
|
|
||||||
createClusterAndStartApplication(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createClusterAndStartApplication(Configuration conf)
|
|
||||||
throws Exception {
|
|
||||||
// start minicluster
|
|
||||||
this.conf = conf;
|
|
||||||
if (autoUpdate) {
|
|
||||||
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
|
|
||||||
}
|
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
|
|
||||||
conf.setLong(
|
|
||||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
|
||||||
rolling_interval_sec);
|
|
||||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
|
||||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
|
||||||
// set the minimum allocation so that resource decrease can go under 1024
|
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
|
||||||
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
|
||||||
conf.setBoolean(
|
|
||||||
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
|
||||||
conf.setInt(
|
|
||||||
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
|
|
||||||
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
|
||||||
yarnCluster.init(conf);
|
|
||||||
yarnCluster.start();
|
|
||||||
|
|
||||||
// start rm client
|
|
||||||
yarnClient = YarnClient.createYarnClient();
|
|
||||||
yarnClient.init(conf);
|
|
||||||
yarnClient.start();
|
|
||||||
|
|
||||||
// get node info
|
|
||||||
assertTrue("All node managers did not connect to the RM within the "
|
|
||||||
+ "allotted 5-second timeout",
|
|
||||||
yarnCluster.waitForNodeManagersToConnect(5000L));
|
|
||||||
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
|
|
||||||
assertEquals("Not all node managers were reported running",
|
|
||||||
nodeCount, nodeReports.size());
|
|
||||||
|
|
||||||
priority = Priority.newInstance(1);
|
|
||||||
priority2 = Priority.newInstance(2);
|
|
||||||
capability = Resource.newInstance(1024, 1);
|
|
||||||
|
|
||||||
node = nodeReports.get(0).getNodeId().getHost();
|
|
||||||
rack = nodeReports.get(0).getRackName();
|
|
||||||
nodes = new String[]{ node };
|
|
||||||
racks = new String[]{ rack };
|
|
||||||
|
|
||||||
// submit new app
|
|
||||||
ApplicationSubmissionContext appContext =
|
|
||||||
yarnClient.createApplication().getApplicationSubmissionContext();
|
|
||||||
ApplicationId appId = appContext.getApplicationId();
|
|
||||||
// set the application name
|
|
||||||
appContext.setApplicationName("Test");
|
|
||||||
// Set the priority for the application master
|
|
||||||
Priority pri = Records.newRecord(Priority.class);
|
|
||||||
pri.setPriority(0);
|
|
||||||
appContext.setPriority(pri);
|
|
||||||
// Set the queue to which this application is to be submitted in the RM
|
|
||||||
appContext.setQueue("default");
|
|
||||||
// Set up the container launch context for the application master
|
|
||||||
ContainerLaunchContext amContainer =
|
|
||||||
BuilderUtils.newContainerLaunchContext(
|
|
||||||
Collections.<String, LocalResource> emptyMap(),
|
|
||||||
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
|
||||||
new HashMap<String, ByteBuffer>(), null,
|
|
||||||
new HashMap<ApplicationAccessType, String>());
|
|
||||||
appContext.setAMContainerSpec(amContainer);
|
|
||||||
appContext.setResource(Resource.newInstance(1024, 1));
|
|
||||||
// Create the request to send to the applications manager
|
|
||||||
SubmitApplicationRequest appRequest = Records
|
|
||||||
.newRecord(SubmitApplicationRequest.class);
|
|
||||||
appRequest.setApplicationSubmissionContext(appContext);
|
|
||||||
// Submit the application to the applications manager
|
|
||||||
yarnClient.submitApplication(appContext);
|
|
||||||
|
|
||||||
// wait for app to start
|
|
||||||
RMAppAttempt appAttempt = null;
|
|
||||||
while (true) {
|
|
||||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
|
||||||
if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
|
|
||||||
attemptId = appReport.getCurrentApplicationAttemptId();
|
|
||||||
appAttempt =
|
|
||||||
yarnCluster.getResourceManager().getRMContext().getRMApps()
|
|
||||||
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
|
||||||
while (true) {
|
|
||||||
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
|
||||||
// of testing.
|
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
|
||||||
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
|
||||||
|
|
||||||
// emulate RM setup of AMRM token in credentials by adding the token
|
|
||||||
// *before* setting the token service
|
|
||||||
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
|
||||||
appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void teardown() throws YarnException, IOException {
|
|
||||||
yarnClient.killApplication(attemptId.getApplicationId());
|
|
||||||
attemptId = null;
|
|
||||||
|
|
||||||
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
|
|
||||||
yarnClient.stop();
|
|
||||||
}
|
|
||||||
if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
|
|
||||||
yarnCluster.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testAMRMClientNoMatchingRequests()
|
public void testAMRMClientNoMatchingRequests()
|
||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
|
@ -905,7 +757,7 @@ public class TestAMRMClient {
|
||||||
initAMRMClientAndTest(false);
|
initAMRMClientAndTest(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initAMRMClientAndTest(boolean useAllocReqId)
|
protected void initAMRMClientAndTest(boolean useAllocReqId)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
AMRMClient<ContainerRequest> amClient = null;
|
AMRMClient<ContainerRequest> amClient = null;
|
||||||
try {
|
try {
|
||||||
|
@ -1946,7 +1798,7 @@ public class TestAMRMClient {
|
||||||
// Wait for enough time and make sure the roll_over happens
|
// Wait for enough time and make sure the roll_over happens
|
||||||
// At mean time, the old AMRMToken should continue to work
|
// At mean time, the old AMRMToken should continue to work
|
||||||
while (System.currentTimeMillis() - startTime <
|
while (System.currentTimeMillis() - startTime <
|
||||||
rolling_interval_sec * 1000) {
|
rollingIntervalSec * 1000) {
|
||||||
amClient.allocate(0.1f);
|
amClient.allocate(0.1f);
|
||||||
sleep(1000);
|
sleep(1000);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,204 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
|
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||||
|
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static java.lang.Thread.sleep;
|
||||||
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
|
||||||
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Placement Constraints and Scheduling Requests.
|
||||||
|
*/
|
||||||
|
public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testAMRMClientWithPlacementConstraints()
|
||||||
|
throws Exception {
|
||||||
|
// we have to create a new instance of MiniYARNCluster to avoid SASL qop
|
||||||
|
// mismatches between client and server
|
||||||
|
teardown();
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
|
||||||
|
createClusterAndStartApplication(conf);
|
||||||
|
|
||||||
|
AMRMClient<AMRMClient.ContainerRequest> amClient =
|
||||||
|
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
|
||||||
|
amClient.setNMTokenCache(new NMTokenCache());
|
||||||
|
//asserting we are not using the singleton instance cache
|
||||||
|
Assert.assertNotSame(NMTokenCache.getSingleton(),
|
||||||
|
amClient.getNMTokenCache());
|
||||||
|
|
||||||
|
final List<Container> allocatedContainers = new ArrayList<>();
|
||||||
|
final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
|
||||||
|
new ArrayList<>();
|
||||||
|
AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
|
||||||
|
new AMRMClientAsync.AbstractCallbackHandler() {
|
||||||
|
@Override
|
||||||
|
public void onContainersAllocated(List<Container> containers) {
|
||||||
|
allocatedContainers.addAll(containers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onRequestsRejected(
|
||||||
|
List<RejectedSchedulingRequest> rejReqs) {
|
||||||
|
rejectedSchedulingRequests.addAll(rejReqs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onContainersCompleted(List<ContainerStatus> statuses) {}
|
||||||
|
@Override
|
||||||
|
public void onContainersUpdated(List<UpdatedContainer> containers) {}
|
||||||
|
@Override
|
||||||
|
public void onShutdownRequest() {}
|
||||||
|
@Override
|
||||||
|
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable e) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return 0.1f;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncClient.init(conf);
|
||||||
|
asyncClient.start();
|
||||||
|
Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
|
||||||
|
pcMapping.put(Collections.singleton("foo"),
|
||||||
|
PlacementConstraints.build(
|
||||||
|
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
|
||||||
|
pcMapping.put(Collections.singleton("bar"),
|
||||||
|
PlacementConstraints.build(
|
||||||
|
PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
|
||||||
|
asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
|
||||||
|
|
||||||
|
// Send two types of requests - 4 with source tag "foo" have numAlloc = 1
|
||||||
|
// and 1 with source tag "bar" and has numAlloc = 4. Both should be
|
||||||
|
// handled similarly. i.e: Since there are only 3 nodes,
|
||||||
|
// 2 schedulingRequests - 1 with source tag "foo" on one with source
|
||||||
|
// tag "bar" should get rejected.
|
||||||
|
asyncClient.addSchedulingRequests(
|
||||||
|
Arrays.asList(
|
||||||
|
// 4 reqs with numAlloc = 1
|
||||||
|
schedulingRequest(1, 1, 1, 1, 512, "foo"),
|
||||||
|
schedulingRequest(1, 1, 2, 1, 512, "foo"),
|
||||||
|
schedulingRequest(1, 1, 3, 1, 512, "foo"),
|
||||||
|
schedulingRequest(1, 1, 4, 1, 512, "foo"),
|
||||||
|
// 1 req with numAlloc = 4
|
||||||
|
schedulingRequest(4, 1, 5, 1, 512, "bar")));
|
||||||
|
|
||||||
|
// kick the scheduler
|
||||||
|
waitForContainerAllocation(allocatedContainers,
|
||||||
|
rejectedSchedulingRequests, 6, 2);
|
||||||
|
|
||||||
|
Assert.assertEquals(6, allocatedContainers.size());
|
||||||
|
Map<NodeId, List<Container>> containersPerNode =
|
||||||
|
allocatedContainers.stream().collect(
|
||||||
|
Collectors.groupingBy(Container::getNodeId));
|
||||||
|
|
||||||
|
// Ensure 2 containers allocated per node.
|
||||||
|
// Each node should have a "foo" and a "bar" container.
|
||||||
|
Assert.assertEquals(3, containersPerNode.entrySet().size());
|
||||||
|
HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
|
||||||
|
containersPerNode.entrySet().forEach(
|
||||||
|
x ->
|
||||||
|
Assert.assertEquals(
|
||||||
|
srcTags,
|
||||||
|
x.getValue()
|
||||||
|
.stream()
|
||||||
|
.map(y -> y.getAllocationTags().iterator().next())
|
||||||
|
.collect(Collectors.toSet()))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Ensure 2 rejected requests - 1 of "foo" and 1 of "bar"
|
||||||
|
Assert.assertEquals(2, rejectedSchedulingRequests.size());
|
||||||
|
Assert.assertEquals(srcTags,
|
||||||
|
rejectedSchedulingRequests
|
||||||
|
.stream()
|
||||||
|
.map(x -> x.getRequest().getAllocationTags().iterator().next())
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
|
||||||
|
asyncClient.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void waitForContainerAllocation(
|
||||||
|
List<Container> allocatedContainers,
|
||||||
|
List<RejectedSchedulingRequest> rejectedRequests,
|
||||||
|
int containerNum, int rejNum) throws Exception {
|
||||||
|
|
||||||
|
int maxCount = 10;
|
||||||
|
while (maxCount >= 0 &&
|
||||||
|
(allocatedContainers.size() < containerNum ||
|
||||||
|
rejectedRequests.size() < rejNum)) {
|
||||||
|
maxCount--;
|
||||||
|
sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SchedulingRequest schedulingRequest(int numAllocations,
|
||||||
|
int priority, long allocReqId, int cores, int mem, String... tags) {
|
||||||
|
return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
|
||||||
|
ExecutionType.GUARANTEED, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SchedulingRequest schedulingRequest(int numAllocations,
|
||||||
|
int priority, long allocReqId, int cores, int mem,
|
||||||
|
ExecutionType execType, String... tags) {
|
||||||
|
return SchedulingRequest.newBuilder()
|
||||||
|
.priority(Priority.newInstance(priority))
|
||||||
|
.allocationRequestId(allocReqId)
|
||||||
|
.allocationTags(new HashSet<>(Arrays.asList(tags)))
|
||||||
|
.executionType(ExecutionTypeRequest.newInstance(execType, true))
|
||||||
|
.resourceSizing(
|
||||||
|
ResourceSizing.newInstance(numAllocations,
|
||||||
|
Resource.newInstance(mem, cores)))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -262,6 +262,9 @@ public class RMContainerImpl implements RMContainer {
|
||||||
rmContext.getSystemMetricsPublisher().containerCreated(
|
rmContext.getSystemMetricsPublisher().containerCreated(
|
||||||
this, this.creationTime);
|
this, this.creationTime);
|
||||||
}
|
}
|
||||||
|
if (this.container != null) {
|
||||||
|
this.allocationTags = this.container.getAllocationTags();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -589,6 +589,7 @@ public abstract class AbstractYarnScheduler
|
||||||
container.setVersion(status.getVersion());
|
container.setVersion(status.getVersion());
|
||||||
container.setExecutionType(status.getExecutionType());
|
container.setExecutionType(status.getExecutionType());
|
||||||
container.setAllocationRequestId(status.getAllocationRequestId());
|
container.setAllocationRequestId(status.getAllocationRequestId());
|
||||||
|
container.setAllocationTags(status.getAllocationTags());
|
||||||
ApplicationAttemptId attemptId =
|
ApplicationAttemptId attemptId =
|
||||||
container.getId().getApplicationAttemptId();
|
container.getId().getApplicationAttemptId();
|
||||||
RMContainer rmContainer = new RMContainerImpl(container,
|
RMContainer rmContainer = new RMContainerImpl(container,
|
||||||
|
|
|
@ -672,6 +672,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
containerType, container.getExecutionType(),
|
containerType, container.getExecutionType(),
|
||||||
container.getAllocationRequestId(),
|
container.getAllocationRequestId(),
|
||||||
rmContainer.getAllocationTags()));
|
rmContainer.getAllocationTags()));
|
||||||
|
container.setAllocationTags(rmContainer.getAllocationTags());
|
||||||
updateNMToken(container);
|
updateNMToken(container);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
// DNS might be down, skip returning this container.
|
// DNS might be down, skip returning this container.
|
||||||
|
|
|
@ -64,12 +64,12 @@ public final class PlacementConstraintsUtil {
|
||||||
throws InvalidAllocationTagsQueryException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
long minScopeCardinality = 0;
|
long minScopeCardinality = 0;
|
||||||
long maxScopeCardinality = 0;
|
long maxScopeCardinality = 0;
|
||||||
if (sc.getScope() == PlacementConstraints.NODE) {
|
if (sc.getScope().equals(PlacementConstraints.NODE)) {
|
||||||
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
|
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
|
||||||
te.getTargetValues(), Long::max);
|
te.getTargetValues(), Long::max);
|
||||||
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
|
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
|
||||||
te.getTargetValues(), Long::min);
|
te.getTargetValues(), Long::min);
|
||||||
} else if (sc.getScope() == PlacementConstraints.RACK) {
|
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
|
||||||
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
|
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
|
||||||
te.getTargetValues(), Long::max);
|
te.getTargetValues(), Long::max);
|
||||||
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
|
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
|
||||||
|
|
Loading…
Reference in New Issue