YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh)
This commit is contained in:
parent
4164a2032a
commit
aaf106fde3
|
@ -58,6 +58,22 @@ public abstract class UpdateContainerError {
|
|||
@InterfaceStability.Unstable
|
||||
public abstract void setReason(String reason);
|
||||
|
||||
/**
|
||||
* Get current container version.
|
||||
* @return Current container Version.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract int getCurrentContainerVersion();
|
||||
|
||||
/**
|
||||
* Set current container version.
|
||||
* @param currentVersion Current container version.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract void setCurrentContainerVersion(int currentVersion);
|
||||
|
||||
/**
|
||||
* Get the {@code UpdateContainerRequest} that was not satisfiable.
|
||||
* @return UpdateContainerRequest
|
||||
|
@ -89,6 +105,7 @@ public abstract class UpdateContainerError {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "UpdateContainerError{reason=" + getReason() + ", "
|
||||
+ "currentVersion=" + getCurrentContainerVersion() + ", "
|
||||
+ "req=" + getUpdateContainerRequest() + "}";
|
||||
}
|
||||
|
||||
|
@ -120,6 +137,6 @@ public abstract class UpdateContainerError {
|
|||
} else if (!req.equals(other.getUpdateContainerRequest())) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return getCurrentContainerVersion() == other.getCurrentContainerVersion();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,6 +78,7 @@ message UpdateContainerRequestProto {
|
|||
message UpdateContainerErrorProto {
|
||||
optional string reason = 1;
|
||||
optional UpdateContainerRequestProto update_request = 2;
|
||||
optional int32 current_container_version = 3;
|
||||
}
|
||||
|
||||
message AllocateRequestProto {
|
||||
|
|
|
@ -33,17 +33,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
|
@ -518,12 +521,38 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
* ResourceManager to change the existing resource allocation to the target
|
||||
* resource allocation.
|
||||
*
|
||||
* @deprecated use
|
||||
* {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or resource change
|
||||
* @param capability The target resource capability of the container
|
||||
*/
|
||||
public abstract void requestContainerResourceChange(
|
||||
Container container, Resource capability);
|
||||
@Deprecated
|
||||
public void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
Preconditions.checkNotNull(container, "Container cannot be null!!");
|
||||
Preconditions.checkNotNull(capability,
|
||||
"UpdateContainerRequest cannot be null!!");
|
||||
requestContainerUpdate(container, UpdateContainerRequest.newInstance(
|
||||
container.getVersion(), container.getId(),
|
||||
Resources.fitsIn(capability, container.getResource()) ?
|
||||
ContainerUpdateType.DECREASE_RESOURCE :
|
||||
ContainerUpdateType.INCREASE_RESOURCE,
|
||||
capability, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Request a container update before calling <code>allocate</code>.
|
||||
* Any previous pending update request of the same container will be
|
||||
* removed.
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or update
|
||||
* @param updateContainerRequest The <code>UpdateContainerRequest</code>.
|
||||
*/
|
||||
public abstract void requestContainerUpdate(
|
||||
Container container, UpdateContainerRequest updateContainerRequest);
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
|
|
|
@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
|
||||
|
@ -284,12 +287,38 @@ extends AbstractService {
|
|||
* ResourceManager to change the existing resource allocation to the target
|
||||
* resource allocation.
|
||||
*
|
||||
* @deprecated use
|
||||
* {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or resource change
|
||||
* @param capability The target resource capability of the container
|
||||
*/
|
||||
public abstract void requestContainerResourceChange(
|
||||
Container container, Resource capability);
|
||||
@Deprecated
|
||||
public void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
Preconditions.checkNotNull(container, "Container cannot be null!!");
|
||||
Preconditions.checkNotNull(capability,
|
||||
"UpdateContainerRequest cannot be null!!");
|
||||
requestContainerUpdate(container, UpdateContainerRequest.newInstance(
|
||||
container.getVersion(), container.getId(),
|
||||
Resources.fitsIn(capability, container.getResource()) ?
|
||||
ContainerUpdateType.DECREASE_RESOURCE :
|
||||
ContainerUpdateType.INCREASE_RESOURCE,
|
||||
capability, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Request a container update before calling <code>allocate</code>.
|
||||
* Any previous pending update request of the same container will be
|
||||
* removed.
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or update
|
||||
* @param updateContainerRequest The <code>UpdateContainerRequest</code>.
|
||||
*/
|
||||
public abstract void requestContainerUpdate(
|
||||
Container container, UpdateContainerRequest updateContainerRequest);
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
@ -207,9 +208,9 @@ extends AMRMClientAsync<T> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
client.requestContainerResourceChange(container, capability);
|
||||
public void requestContainerUpdate(Container container,
|
||||
UpdateContainerRequest updateContainerRequest) {
|
||||
client.requestContainerUpdate(container, updateContainerRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -169,15 +169,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
|
||||
// change map holds container resource change requests between two allocate()
|
||||
// calls, and are cleared after each successful allocate() call.
|
||||
protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
|
||||
new HashMap<>();
|
||||
protected final Map<ContainerId,
|
||||
SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>();
|
||||
// pendingChange map holds history of container resource change requests in
|
||||
// case AM needs to reregister with the ResourceManager.
|
||||
// Change requests are removed from this map if RM confirms the change
|
||||
// through allocate response, or if RM confirms that the container has been
|
||||
// completed.
|
||||
protected final Map<ContainerId, SimpleEntry<Container, Resource>>
|
||||
pendingChange = new HashMap<>();
|
||||
protected final Map<ContainerId,
|
||||
SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
|
||||
new HashMap<>();
|
||||
|
||||
public AMRMClientImpl() {
|
||||
super(AMRMClientImpl.class.getName());
|
||||
|
@ -259,7 +260,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
AllocateRequest allocateRequest = null;
|
||||
List<String> blacklistToAdd = new ArrayList<String>();
|
||||
List<String> blacklistToRemove = new ArrayList<String>();
|
||||
Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
|
||||
Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
|
||||
new HashMap<>();
|
||||
try {
|
||||
synchronized (this) {
|
||||
|
@ -374,14 +375,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
//
|
||||
// Only insert entries from the cached oldChange map
|
||||
// that do not exist in the current change map:
|
||||
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
|
||||
for (Map.Entry<ContainerId,
|
||||
SimpleEntry<Container, UpdateContainerRequest>> entry :
|
||||
oldChange.entrySet()) {
|
||||
ContainerId oldContainerId = entry.getKey();
|
||||
Container oldContainer = entry.getValue().getKey();
|
||||
Resource oldResource = entry.getValue().getValue();
|
||||
UpdateContainerRequest oldupdate = entry.getValue().getValue();
|
||||
if (change.get(oldContainerId) == null) {
|
||||
change.put(
|
||||
oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
|
||||
oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
|
||||
}
|
||||
}
|
||||
blacklistAdditions.addAll(blacklistToAdd);
|
||||
|
@ -394,19 +396,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
|
||||
private List<UpdateContainerRequest> createUpdateList() {
|
||||
List<UpdateContainerRequest> updateList = new ArrayList<>();
|
||||
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
|
||||
change.entrySet()) {
|
||||
Resource targetCapability = entry.getValue().getValue();
|
||||
Resource currCapability = entry.getValue().getKey().getResource();
|
||||
int version = entry.getValue().getKey().getVersion();
|
||||
for (Map.Entry<ContainerId, SimpleEntry<Container,
|
||||
UpdateContainerRequest>> entry : change.entrySet()) {
|
||||
Resource targetCapability = entry.getValue().getValue().getCapability();
|
||||
ExecutionType targetExecType =
|
||||
entry.getValue().getValue().getExecutionType();
|
||||
ContainerUpdateType updateType =
|
||||
ContainerUpdateType.INCREASE_RESOURCE;
|
||||
if (Resources.fitsIn(targetCapability, currCapability)) {
|
||||
updateType = ContainerUpdateType.DECREASE_RESOURCE;
|
||||
}
|
||||
entry.getValue().getValue().getContainerUpdateType();
|
||||
int version = entry.getValue().getKey().getVersion();
|
||||
updateList.add(
|
||||
UpdateContainerRequest.newInstance(version, entry.getKey(),
|
||||
updateType, targetCapability, null));
|
||||
updateType, targetCapability, targetExecType));
|
||||
}
|
||||
return updateList;
|
||||
}
|
||||
|
@ -591,21 +591,47 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
validateContainerResourceChangeRequest(
|
||||
container.getId(), container.getResource(), capability);
|
||||
public synchronized void requestContainerUpdate(
|
||||
Container container, UpdateContainerRequest updateContainerRequest) {
|
||||
Preconditions.checkNotNull(container, "Container cannot be null!!");
|
||||
Preconditions.checkNotNull(updateContainerRequest,
|
||||
"UpdateContainerRequest cannot be null!!");
|
||||
LOG.info("Requesting Container update : " +
|
||||
"container=" + container + ", " +
|
||||
"updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
|
||||
"targetCapability=" + updateContainerRequest.getCapability() + ", " +
|
||||
"targetExecType=" + updateContainerRequest.getExecutionType());
|
||||
if (updateContainerRequest.getCapability() != null &&
|
||||
updateContainerRequest.getExecutionType() == null) {
|
||||
validateContainerResourceChangeRequest(
|
||||
updateContainerRequest.getContainerUpdateType(),
|
||||
container.getId(), container.getResource(),
|
||||
updateContainerRequest.getCapability());
|
||||
} else if (updateContainerRequest.getExecutionType() != null &&
|
||||
updateContainerRequest.getCapability() == null) {
|
||||
validateContainerExecTypeChangeRequest(
|
||||
updateContainerRequest.getContainerUpdateType(),
|
||||
container.getId(), container.getExecutionType(),
|
||||
updateContainerRequest.getExecutionType());
|
||||
} else if (updateContainerRequest.getExecutionType() == null &&
|
||||
updateContainerRequest.getCapability() == null) {
|
||||
throw new IllegalArgumentException("Both target Capability and" +
|
||||
"target Execution Type are null");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Support currently exists only for" +
|
||||
" EITHER update of Capability OR update of Execution Type NOT both");
|
||||
}
|
||||
if (change.get(container.getId()) == null) {
|
||||
change.put(container.getId(),
|
||||
new SimpleEntry<>(container, capability));
|
||||
new SimpleEntry<>(container, updateContainerRequest));
|
||||
} else {
|
||||
change.get(container.getId()).setValue(capability);
|
||||
change.get(container.getId()).setValue(updateContainerRequest);
|
||||
}
|
||||
if (pendingChange.get(container.getId()) == null) {
|
||||
pendingChange.put(container.getId(),
|
||||
new SimpleEntry<>(container, capability));
|
||||
new SimpleEntry<>(container, updateContainerRequest));
|
||||
} else {
|
||||
pendingChange.get(container.getId()).setValue(capability);
|
||||
pendingChange.get(container.getId()).setValue(updateContainerRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -755,7 +781,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
|
||||
private void validateContainerResourceChangeRequest(
|
||||
ContainerId containerId, Resource original, Resource target) {
|
||||
ContainerUpdateType updateType, ContainerId containerId,
|
||||
Resource original, Resource target) {
|
||||
Preconditions.checkArgument(containerId != null,
|
||||
"ContainerId cannot be null");
|
||||
Preconditions.checkArgument(original != null,
|
||||
|
@ -768,6 +795,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
|
||||
&& Resources.fitsIn(Resources.none(), target),
|
||||
"Target resource capability must be greater than 0");
|
||||
if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
|
||||
Preconditions.checkArgument(Resources.fitsIn(target, original),
|
||||
"Target resource capability must fit in Original capability");
|
||||
} else {
|
||||
Preconditions.checkArgument(Resources.fitsIn(original, target),
|
||||
"Target resource capability must be more than Original capability");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void validateContainerExecTypeChangeRequest(
|
||||
ContainerUpdateType updateType, ContainerId containerId,
|
||||
ExecutionType original, ExecutionType target) {
|
||||
Preconditions.checkArgument(containerId != null,
|
||||
"ContainerId cannot be null");
|
||||
Preconditions.checkArgument(original != null,
|
||||
"Original Execution Type cannot be null");
|
||||
Preconditions.checkArgument(target != null,
|
||||
"Target Execution Type cannot be null");
|
||||
if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
|
||||
Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
|
||||
&& original == ExecutionType.GUARANTEED,
|
||||
"Incorrect Container update request, target should be" +
|
||||
" OPPORTUNISTIC and original should be GUARANTEED");
|
||||
} else {
|
||||
Preconditions.checkArgument(target == ExecutionType.GUARANTEED
|
||||
&& original == ExecutionType.OPPORTUNISTIC,
|
||||
"Incorrect Container update request, target should be" +
|
||||
" GUARANTEED and original should be OPPORTUNISTIC");
|
||||
}
|
||||
}
|
||||
|
||||
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||
|
|
|
@ -51,29 +51,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
@ -1058,26 +1036,36 @@ public class TestAMRMClient {
|
|||
(AMRMClientImpl<ContainerRequest>) amClient;
|
||||
Assert.assertEquals(0, amClientImpl.change.size());
|
||||
// verify newer request overwrites older request for the container1
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(2048, 1));
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(4096, 1));
|
||||
amClientImpl.requestContainerUpdate(container1,
|
||||
UpdateContainerRequest.newInstance(container1.getVersion(),
|
||||
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
||||
Resource.newInstance(2048, 1), null));
|
||||
amClientImpl.requestContainerUpdate(container1,
|
||||
UpdateContainerRequest.newInstance(container1.getVersion(),
|
||||
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
||||
Resource.newInstance(4096, 1), null));
|
||||
Assert.assertEquals(Resource.newInstance(4096, 1),
|
||||
amClientImpl.change.get(container1.getId()).getValue());
|
||||
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
||||
// verify new decrease request cancels old increase request for container1
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(512, 1));
|
||||
amClientImpl.requestContainerUpdate(container1,
|
||||
UpdateContainerRequest.newInstance(container1.getVersion(),
|
||||
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
|
||||
Resource.newInstance(512, 1), null));
|
||||
Assert.assertEquals(Resource.newInstance(512, 1),
|
||||
amClientImpl.change.get(container1.getId()).getValue());
|
||||
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
||||
// request resource increase for container2
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container2, Resource.newInstance(2048, 1));
|
||||
amClientImpl.requestContainerUpdate(container2,
|
||||
UpdateContainerRequest.newInstance(container2.getVersion(),
|
||||
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
||||
Resource.newInstance(2048, 1), null));
|
||||
Assert.assertEquals(Resource.newInstance(2048, 1),
|
||||
amClientImpl.change.get(container2.getId()).getValue());
|
||||
amClientImpl.change.get(container2.getId()).getValue().getCapability());
|
||||
// verify release request will cancel pending change requests for the same
|
||||
// container
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container3, Resource.newInstance(2048, 1));
|
||||
amClientImpl.requestContainerUpdate(container3,
|
||||
UpdateContainerRequest.newInstance(container3.getVersion(),
|
||||
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
||||
Resource.newInstance(2048, 1), null));
|
||||
Assert.assertEquals(3, amClientImpl.pendingChange.size());
|
||||
amClientImpl.releaseAssignedContainer(container3.getId());
|
||||
Assert.assertEquals(2, amClientImpl.pendingChange.size());
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -233,8 +234,11 @@ public class TestAMRMClientOnRMRestart {
|
|||
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
|
||||
containerId.getContainerId(), ContainerState.RUNNING);
|
||||
dispatcher.await();
|
||||
amClient.requestContainerResourceChange(
|
||||
container, Resource.newInstance(2048, 1));
|
||||
amClient.requestContainerUpdate(
|
||||
container, UpdateContainerRequest.newInstance(
|
||||
container.getVersion(), container.getId(),
|
||||
ContainerUpdateType.INCREASE_RESOURCE,
|
||||
Resource.newInstance(2048, 1), null));
|
||||
it.remove();
|
||||
|
||||
allocateResponse = amClient.allocate(0.3f);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
|
@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
|
@ -66,13 +72,17 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
/**
|
||||
* Class that tests the allocation of OPPORTUNISTIC containers through the
|
||||
|
@ -83,7 +93,6 @@ public class TestOpportunisticContainerAllocation {
|
|||
private static MiniYARNCluster yarnCluster = null;
|
||||
private static YarnClient yarnClient = null;
|
||||
private static List<NodeReport> nodeReports = null;
|
||||
private static ApplicationAttemptId attemptId = null;
|
||||
private static int nodeCount = 3;
|
||||
|
||||
private static final int ROLLING_INTERVAL_SEC = 13;
|
||||
|
@ -92,12 +101,22 @@ public class TestOpportunisticContainerAllocation {
|
|||
private static Resource capability;
|
||||
private static Priority priority;
|
||||
private static Priority priority2;
|
||||
private static Priority priority3;
|
||||
private static Priority priority4;
|
||||
private static String node;
|
||||
private static String rack;
|
||||
private static String[] nodes;
|
||||
private static String[] racks;
|
||||
private final static int DEFAULT_ITERATION = 3;
|
||||
|
||||
// Per test..
|
||||
private ApplicationAttemptId attemptId = null;
|
||||
private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
|
||||
private long availMB;
|
||||
private int availVCores;
|
||||
private long allocMB;
|
||||
private int allocVCores;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
// start minicluster
|
||||
|
@ -106,7 +125,7 @@ public class TestOpportunisticContainerAllocation {
|
|||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
ROLLING_INTERVAL_SEC);
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
|
||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
|
||||
// set the minimum allocation so that resource decrease can go under 1024
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||
conf.setBoolean(
|
||||
|
@ -129,7 +148,9 @@ public class TestOpportunisticContainerAllocation {
|
|||
|
||||
priority = Priority.newInstance(1);
|
||||
priority2 = Priority.newInstance(2);
|
||||
capability = Resource.newInstance(1024, 1);
|
||||
priority3 = Priority.newInstance(3);
|
||||
priority4 = Priority.newInstance(4);
|
||||
capability = Resource.newInstance(512, 1);
|
||||
|
||||
node = nodeReports.get(0).getNodeId().getHost();
|
||||
rack = nodeReports.get(0).getRackName();
|
||||
|
@ -193,10 +214,35 @@ public class TestOpportunisticContainerAllocation {
|
|||
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||
appAttempt.getAMRMToken()
|
||||
.setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||
|
||||
// start am rm client
|
||||
amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
|
||||
.createAMRMClient();
|
||||
|
||||
//setting an instance NMTokenCache
|
||||
amClient.setNMTokenCache(new NMTokenCache());
|
||||
//asserting we are not using the singleton instance cache
|
||||
Assert.assertNotSame(NMTokenCache.getSingleton(),
|
||||
amClient.getNMTokenCache());
|
||||
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
}
|
||||
|
||||
@After
|
||||
public void cancelApp() throws YarnException, IOException {
|
||||
try {
|
||||
amClient
|
||||
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
|
||||
null);
|
||||
} finally {
|
||||
if (amClient != null &&
|
||||
amClient.getServiceState() == Service.STATE.STARTED) {
|
||||
amClient.stop();
|
||||
}
|
||||
}
|
||||
yarnClient.killApplication(attemptId.getApplicationId());
|
||||
attemptId = null;
|
||||
}
|
||||
|
@ -214,43 +260,254 @@ public class TestOpportunisticContainerAllocation {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAMRMClient() throws YarnException, IOException {
|
||||
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
|
||||
try {
|
||||
// start am rm client
|
||||
amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
|
||||
public void testPromotionFromAcquired() throws YarnException, IOException {
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
//setting an instance NMTokenCache
|
||||
amClient.setNMTokenCache(new NMTokenCache());
|
||||
//asserting we are not using the singleton instance cache
|
||||
Assert.assertNotSame(NMTokenCache.getSingleton(),
|
||||
amClient.getNMTokenCache());
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
assertEquals(1, oppContainersRequestedAny);
|
||||
|
||||
testOpportunisticAllocation(
|
||||
(AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
|
||||
assertEquals(1, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
|
||||
int iterationsLeft = 50;
|
||||
|
||||
amClient
|
||||
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
|
||||
null);
|
||||
amClient.getNMTokenCache().clearCache();
|
||||
Assert.assertEquals(0,
|
||||
amClient.getNMTokenCache().numberOfTokensInCache());
|
||||
HashMap<String, Token> receivedNMTokens = new HashMap<>();
|
||||
|
||||
} finally {
|
||||
if (amClient != null &&
|
||||
amClient.getServiceState() == Service.STATE.STARTED) {
|
||||
amClient.stop();
|
||||
updateMetrics("Before Opp Allocation");
|
||||
|
||||
while (allocatedContainerCount < oppContainersRequestedAny
|
||||
&& iterationsLeft-- > 0) {
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
allocatedContainerCount +=
|
||||
allocResponse.getAllocatedContainers().size();
|
||||
for (Container container : allocResponse.getAllocatedContainers()) {
|
||||
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||
allocatedOpportContainers.put(container.getId(), container);
|
||||
removeCR(container);
|
||||
}
|
||||
}
|
||||
|
||||
for (NMToken token : allocResponse.getNMTokens()) {
|
||||
String nodeID = token.getNodeId().toString();
|
||||
receivedNMTokens.put(nodeID, token.getToken());
|
||||
}
|
||||
|
||||
if (allocatedContainerCount < oppContainersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
|
||||
assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
|
||||
|
||||
updateMetrics("After Opp Allocation / Before Promotion");
|
||||
|
||||
try {
|
||||
Container c = allocatedOpportContainers.values().iterator().next();
|
||||
amClient.requestContainerUpdate(
|
||||
c, UpdateContainerRequest.newInstance(c.getVersion(),
|
||||
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.OPPORTUNISTIC));
|
||||
Assert.fail("Should throw Exception..");
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("## " + e.getMessage());
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"target should be GUARANTEED and original should be OPPORTUNISTIC"));
|
||||
}
|
||||
|
||||
Container c = allocatedOpportContainers.values().iterator().next();
|
||||
amClient.requestContainerUpdate(
|
||||
c, UpdateContainerRequest.newInstance(c.getVersion(),
|
||||
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.GUARANTEED));
|
||||
iterationsLeft = 120;
|
||||
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
|
||||
// do a few iterations to ensure RM is not going to send new containers
|
||||
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
|
||||
// inform RM of rejection
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
// RM did not send new containers because AM does not need any
|
||||
if (allocResponse.getUpdatedContainers() != null) {
|
||||
for (UpdatedContainer updatedContainer : allocResponse
|
||||
.getUpdatedContainers()) {
|
||||
System.out.println("Got update..");
|
||||
updatedContainers.put(updatedContainer.getContainer().getId(),
|
||||
updatedContainer);
|
||||
}
|
||||
}
|
||||
if (iterationsLeft > 0) {
|
||||
// sleep to make sure NM's heartbeat
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
updateMetrics("After Promotion");
|
||||
|
||||
assertEquals(1, updatedContainers.size());
|
||||
for (ContainerId cId : allocatedOpportContainers.keySet()) {
|
||||
Container orig = allocatedOpportContainers.get(cId);
|
||||
UpdatedContainer updatedContainer = updatedContainers.get(cId);
|
||||
assertNotNull(updatedContainer);
|
||||
assertEquals(ExecutionType.GUARANTEED,
|
||||
updatedContainer.getContainer().getExecutionType());
|
||||
assertEquals(orig.getResource(),
|
||||
updatedContainer.getContainer().getResource());
|
||||
assertEquals(orig.getNodeId(),
|
||||
updatedContainer.getContainer().getNodeId());
|
||||
assertEquals(orig.getVersion() + 1,
|
||||
updatedContainer.getContainer().getVersion());
|
||||
}
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
amClient.ask.clear();
|
||||
}
|
||||
|
||||
private void testAllocation(
|
||||
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
|
||||
throws YarnException, IOException {
|
||||
@Test(timeout = 60000)
|
||||
public void testDemotionFromAcquired() throws YarnException, IOException {
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority3));
|
||||
|
||||
int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
|
||||
assertEquals(1, guarContainersRequestedAny);
|
||||
|
||||
assertEquals(1, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
|
||||
int iterationsLeft = 50;
|
||||
|
||||
amClient.getNMTokenCache().clearCache();
|
||||
Assert.assertEquals(0,
|
||||
amClient.getNMTokenCache().numberOfTokensInCache());
|
||||
HashMap<String, Token> receivedNMTokens = new HashMap<>();
|
||||
|
||||
updateMetrics("Before Guar Allocation");
|
||||
|
||||
while (allocatedContainerCount < guarContainersRequestedAny
|
||||
&& iterationsLeft-- > 0) {
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
allocatedContainerCount +=
|
||||
allocResponse.getAllocatedContainers().size();
|
||||
for (Container container : allocResponse.getAllocatedContainers()) {
|
||||
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
|
||||
allocatedGuarContainers.put(container.getId(), container);
|
||||
removeCR(container);
|
||||
}
|
||||
}
|
||||
|
||||
for (NMToken token : allocResponse.getNMTokens()) {
|
||||
String nodeID = token.getNodeId().toString();
|
||||
receivedNMTokens.put(nodeID, token.getToken());
|
||||
}
|
||||
|
||||
if (allocatedContainerCount < guarContainersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(guarContainersRequestedAny, allocatedContainerCount);
|
||||
assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
|
||||
|
||||
updateMetrics("After Guar Allocation / Before Demotion");
|
||||
|
||||
try {
|
||||
Container c = allocatedGuarContainers.values().iterator().next();
|
||||
amClient.requestContainerUpdate(
|
||||
c, UpdateContainerRequest.newInstance(c.getVersion(),
|
||||
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.GUARANTEED));
|
||||
Assert.fail("Should throw Exception..");
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("## " + e.getMessage());
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"target should be OPPORTUNISTIC and original should be GUARANTEED"));
|
||||
}
|
||||
|
||||
Container c = allocatedGuarContainers.values().iterator().next();
|
||||
amClient.requestContainerUpdate(
|
||||
c, UpdateContainerRequest.newInstance(c.getVersion(),
|
||||
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
||||
null, ExecutionType.OPPORTUNISTIC));
|
||||
iterationsLeft = 120;
|
||||
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
|
||||
// do a few iterations to ensure RM is not going to send new containers
|
||||
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
|
||||
// inform RM of rejection
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
// RM did not send new containers because AM does not need any
|
||||
if (allocResponse.getUpdatedContainers() != null) {
|
||||
for (UpdatedContainer updatedContainer : allocResponse
|
||||
.getUpdatedContainers()) {
|
||||
System.out.println("Got update..");
|
||||
updatedContainers.put(updatedContainer.getContainer().getId(),
|
||||
updatedContainer);
|
||||
}
|
||||
}
|
||||
if (iterationsLeft > 0) {
|
||||
// sleep to make sure NM's heartbeat
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
updateMetrics("After Demotion");
|
||||
|
||||
assertEquals(1, updatedContainers.size());
|
||||
for (ContainerId cId : allocatedGuarContainers.keySet()) {
|
||||
Container orig = allocatedGuarContainers.get(cId);
|
||||
UpdatedContainer updatedContainer = updatedContainers.get(cId);
|
||||
assertNotNull(updatedContainer);
|
||||
assertEquals(ExecutionType.OPPORTUNISTIC,
|
||||
updatedContainer.getContainer().getExecutionType());
|
||||
assertEquals(orig.getResource(),
|
||||
updatedContainer.getContainer().getResource());
|
||||
assertEquals(orig.getNodeId(),
|
||||
updatedContainer.getContainer().getNodeId());
|
||||
assertEquals(orig.getVersion() + 1,
|
||||
updatedContainer.getContainer().getVersion());
|
||||
}
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
amClient.ask.clear();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMixedAllocationAndRelease() throws YarnException,
|
||||
IOException {
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
@ -274,16 +531,6 @@ public class TestOpportunisticContainerAllocation {
|
|||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
int containersRequestedNode = amClient.getTable(0).get(priority,
|
||||
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
@ -298,6 +545,38 @@ public class TestOpportunisticContainerAllocation {
|
|||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(4, containersRequestedNode);
|
||||
assertEquals(4, containersRequestedRack);
|
||||
assertEquals(4, containersRequestedAny);
|
||||
assertEquals(2, oppContainersRequestedAny);
|
||||
|
||||
assertEquals(4, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
containersRequestedNode = amClient.getTable(0).get(priority,
|
||||
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
containersRequestedRack = amClient.getTable(0).get(priority,
|
||||
rack, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
containersRequestedAny = amClient.getTable(0).get(priority,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(2, containersRequestedNode);
|
||||
assertEquals(2, containersRequestedRack);
|
||||
assertEquals(2, containersRequestedAny);
|
||||
|
@ -309,7 +588,7 @@ public class TestOpportunisticContainerAllocation {
|
|||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
int allocatedOpportContainerCount = 0;
|
||||
int iterationsLeft = 10;
|
||||
int iterationsLeft = 50;
|
||||
Set<ContainerId> releases = new TreeSet<>();
|
||||
|
||||
amClient.getNMTokenCache().clearCache();
|
||||
|
@ -324,8 +603,8 @@ public class TestOpportunisticContainerAllocation {
|
|||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
allocatedContainerCount += allocResponse.getAllocatedContainers()
|
||||
.size();
|
||||
allocatedContainerCount +=
|
||||
allocResponse.getAllocatedContainers().size();
|
||||
for (Container container : allocResponse.getAllocatedContainers()) {
|
||||
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||
allocatedOpportContainerCount++;
|
||||
|
@ -345,9 +624,9 @@ public class TestOpportunisticContainerAllocation {
|
|||
}
|
||||
}
|
||||
|
||||
assertEquals(allocatedContainerCount,
|
||||
containersRequestedAny + oppContainersRequestedAny);
|
||||
assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
|
||||
assertEquals(containersRequestedAny + oppContainersRequestedAny,
|
||||
allocatedContainerCount);
|
||||
assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
|
||||
for (ContainerId rejectContainerId : releases) {
|
||||
amClient.releaseAssignedContainer(rejectContainerId);
|
||||
}
|
||||
|
@ -395,26 +674,25 @@ public class TestOpportunisticContainerAllocation {
|
|||
/**
|
||||
* Tests allocation with requests comprising only opportunistic containers.
|
||||
*/
|
||||
private void testOpportunisticAllocation(
|
||||
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
|
||||
throws YarnException, IOException {
|
||||
@Test(timeout = 60000)
|
||||
public void testOpportunisticAllocation() throws YarnException, IOException {
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority, ResourceRequest.ANY,
|
||||
amClient.getTable(0).get(priority3, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
|
@ -456,9 +734,43 @@ public class TestOpportunisticContainerAllocation {
|
|||
}
|
||||
}
|
||||
|
||||
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
|
||||
assertEquals(1, receivedNMTokens.values().size());
|
||||
}
|
||||
|
||||
private void removeCR(Container container) {
|
||||
List<? extends Collection<AMRMClient.ContainerRequest>>
|
||||
matchingRequests = amClient.getMatchingRequests(container
|
||||
.getPriority(),
|
||||
ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
|
||||
container.getResource());
|
||||
Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
|
||||
for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
|
||||
for (AMRMClient.ContainerRequest cr : rc) {
|
||||
toRemove.add(cr);
|
||||
}
|
||||
}
|
||||
for (AMRMClient.ContainerRequest cr : toRemove) {
|
||||
amClient.removeContainerRequest(cr);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMetrics(String msg) {
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler)yarnCluster.getResourceManager()
|
||||
.getResourceScheduler();
|
||||
availMB = scheduler.getRootQueueMetrics().getAvailableMB();
|
||||
availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
|
||||
allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
|
||||
allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
|
||||
System.out.println("## METRICS (" + msg + ")==>");
|
||||
System.out.println(" : availMB=" + availMB + ", " +
|
||||
"availVCores=" +availVCores + ", " +
|
||||
"allocMB=" + allocMB + ", " +
|
||||
"allocVCores=" + allocVCores + ", ");
|
||||
System.out.println("<== ##");
|
||||
}
|
||||
|
||||
private void sleep(int sleepTime) {
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
|
|
|
@ -73,6 +73,22 @@ public class UpdateContainerErrorPBImpl extends UpdateContainerError {
|
|||
this.reason = reason;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentContainerVersion() {
|
||||
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasCurrentContainerVersion()) {
|
||||
return 0;
|
||||
}
|
||||
return p.getCurrentContainerVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCurrentContainerVersion(int containerVersion) {
|
||||
maybeInitBuilder();
|
||||
builder.setCurrentContainerVersion(containerVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateContainerRequest getUpdateContainerRequest() {
|
||||
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto
|
||||
|
|
|
@ -188,19 +188,25 @@ public class RMServerUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
checkAndcreateUpdateError(updateErrors, updateReq, msg);
|
||||
checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
|
||||
}
|
||||
return updateRequests;
|
||||
}
|
||||
|
||||
private static void checkAndcreateUpdateError(
|
||||
List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
|
||||
String msg) {
|
||||
RMContainer rmContainer, String msg) {
|
||||
if (msg != null) {
|
||||
UpdateContainerError updateError = RECORD_FACTORY
|
||||
.newRecordInstance(UpdateContainerError.class);
|
||||
updateError.setReason(msg);
|
||||
updateError.setUpdateContainerRequest(updateReq);
|
||||
if (rmContainer != null) {
|
||||
updateError.setCurrentContainerVersion(
|
||||
rmContainer.getContainer().getVersion());
|
||||
} else {
|
||||
updateError.setCurrentContainerVersion(-1);
|
||||
}
|
||||
errors.add(updateError);
|
||||
}
|
||||
}
|
||||
|
@ -216,9 +222,7 @@ public class RMServerUtils {
|
|||
// version
|
||||
if (msg == null && updateReq.getContainerVersion() !=
|
||||
rmContainer.getContainer().getVersion()) {
|
||||
msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
|
||||
+ updateReq.getContainerVersion() + "|"
|
||||
+ rmContainer.getContainer().getVersion();
|
||||
msg = INCORRECT_CONTAINER_VERSION_ERROR;
|
||||
}
|
||||
// No more than 1 container update per request.
|
||||
if (msg == null &&
|
||||
|
|
|
@ -251,8 +251,11 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||
|
||||
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
|
||||
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
|
||||
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
|
||||
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
|
||||
allocateResponse.getUpdateErrors().get(0).getReason());
|
||||
Assert.assertEquals(0,
|
||||
allocateResponse.getUpdateErrors().get(0)
|
||||
.getCurrentContainerVersion());
|
||||
Assert.assertEquals(container.getId(),
|
||||
allocateResponse.getUpdateErrors().get(0)
|
||||
.getUpdateContainerRequest().getContainerId());
|
||||
|
|
|
@ -275,8 +275,10 @@ public class TestIncreaseAllocationExpirer {
|
|||
Resources.createResource(5 * GB), null)));
|
||||
List<UpdateContainerError> updateErrors = response.getUpdateErrors();
|
||||
Assert.assertEquals(1, updateErrors.size());
|
||||
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1",
|
||||
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
|
||||
updateErrors.get(0).getReason());
|
||||
Assert.assertEquals(1,
|
||||
updateErrors.get(0).getCurrentContainerVersion());
|
||||
|
||||
// am1 asks to change containerId2 from 3GB to 5GB
|
||||
am1.sendContainerResizingRequest(Collections.singletonList(
|
||||
|
|
Loading…
Reference in New Issue