YARN-5966. AMRMClient changes to support ExecutionType update. (asuresh)

(cherry picked from commit aaf106fde3)
This commit is contained in:
Arun Suresh 2017-02-14 06:08:27 -08:00
parent d87a92bf55
commit 2d62af6545
13 changed files with 594 additions and 131 deletions

View File

@ -58,6 +58,22 @@ public abstract class UpdateContainerError {
@InterfaceStability.Unstable
public abstract void setReason(String reason);
/**
* Get current container version.
* @return Current container Version.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract int getCurrentContainerVersion();
/**
* Set current container version.
* @param currentVersion Current container version.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract void setCurrentContainerVersion(int currentVersion);
/**
* Get the {@code UpdateContainerRequest} that was not satisfiable.
* @return UpdateContainerRequest
@ -89,6 +105,7 @@ public abstract class UpdateContainerError {
@Override
public String toString() {
return "UpdateContainerError{reason=" + getReason() + ", "
+ "currentVersion=" + getCurrentContainerVersion() + ", "
+ "req=" + getUpdateContainerRequest() + "}";
}
@ -120,6 +137,6 @@ public abstract class UpdateContainerError {
} else if (!req.equals(other.getUpdateContainerRequest())) {
return false;
}
return true;
return getCurrentContainerVersion() == other.getCurrentContainerVersion();
}
}

View File

@ -78,6 +78,7 @@ message UpdateContainerRequestProto {
message UpdateContainerErrorProto {
optional string reason = 1;
optional UpdateContainerRequestProto update_request = 2;
optional int32 current_container_version = 3;
}
message AllocateRequestProto {

View File

@ -33,17 +33,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -516,12 +519,38 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
* @deprecated use
* {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
*
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
public abstract void requestContainerResourceChange(
Container container, Resource capability);
@Deprecated
public void requestContainerResourceChange(
Container container, Resource capability) {
Preconditions.checkNotNull(container, "Container cannot be null!!");
Preconditions.checkNotNull(capability,
"UpdateContainerRequest cannot be null!!");
requestContainerUpdate(container, UpdateContainerRequest.newInstance(
container.getVersion(), container.getId(),
Resources.fitsIn(capability, container.getResource()) ?
ContainerUpdateType.DECREASE_RESOURCE :
ContainerUpdateType.INCREASE_RESOURCE,
capability, null));
}
/**
* Request a container update before calling <code>allocate</code>.
* Any previous pending update request of the same container will be
* removed.
*
* @param container The container returned from the last successful resource
* allocation or update
* @param updateContainerRequest The <code>UpdateContainerRequest</code>.
*/
public abstract void requestContainerUpdate(
Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use

View File

@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
@ -283,12 +286,38 @@ extends AbstractService {
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
* @deprecated use
* {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
*
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
public abstract void requestContainerResourceChange(
Container container, Resource capability);
@Deprecated
public void requestContainerResourceChange(
Container container, Resource capability) {
Preconditions.checkNotNull(container, "Container cannot be null!!");
Preconditions.checkNotNull(capability,
"UpdateContainerRequest cannot be null!!");
requestContainerUpdate(container, UpdateContainerRequest.newInstance(
container.getVersion(), container.getId(),
Resources.fitsIn(capability, container.getResource()) ?
ContainerUpdateType.DECREASE_RESOURCE :
ContainerUpdateType.INCREASE_RESOURCE,
capability, null));
}
/**
* Request a container update before calling <code>allocate</code>.
* Any previous pending update request of the same container will be
* removed.
*
* @param container The container returned from the last successful resource
* allocation or update
* @param updateContainerRequest The <code>UpdateContainerRequest</code>.
*/
public abstract void requestContainerUpdate(
Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -204,9 +205,9 @@ extends AMRMClientAsync<T> {
}
@Override
public void requestContainerResourceChange(
Container container, Resource capability) {
client.requestContainerResourceChange(container, capability);
public void requestContainerUpdate(Container container,
UpdateContainerRequest updateContainerRequest) {
client.requestContainerUpdate(container, updateContainerRequest);
}
/**

View File

@ -169,15 +169,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
// change map holds container resource change requests between two allocate()
// calls, and are cleared after each successful allocate() call.
protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
new HashMap<>();
protected final Map<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>();
// pendingChange map holds history of container resource change requests in
// case AM needs to reregister with the ResourceManager.
// Change requests are removed from this map if RM confirms the change
// through allocate response, or if RM confirms that the container has been
// completed.
protected final Map<ContainerId, SimpleEntry<Container, Resource>>
pendingChange = new HashMap<>();
protected final Map<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
new HashMap<>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@ -259,7 +260,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
new HashMap<>();
try {
synchronized (this) {
@ -375,14 +376,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
//
// Only insert entries from the cached oldChange map
// that do not exist in the current change map:
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
for (Map.Entry<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> entry :
oldChange.entrySet()) {
ContainerId oldContainerId = entry.getKey();
Container oldContainer = entry.getValue().getKey();
Resource oldResource = entry.getValue().getValue();
UpdateContainerRequest oldupdate = entry.getValue().getValue();
if (change.get(oldContainerId) == null) {
change.put(
oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
}
}
blacklistAdditions.addAll(blacklistToAdd);
@ -395,19 +397,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private List<UpdateContainerRequest> createUpdateList() {
List<UpdateContainerRequest> updateList = new ArrayList<>();
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
change.entrySet()) {
Resource targetCapability = entry.getValue().getValue();
Resource currCapability = entry.getValue().getKey().getResource();
int version = entry.getValue().getKey().getVersion();
for (Map.Entry<ContainerId, SimpleEntry<Container,
UpdateContainerRequest>> entry : change.entrySet()) {
Resource targetCapability = entry.getValue().getValue().getCapability();
ExecutionType targetExecType =
entry.getValue().getValue().getExecutionType();
ContainerUpdateType updateType =
ContainerUpdateType.INCREASE_RESOURCE;
if (Resources.fitsIn(targetCapability, currCapability)) {
updateType = ContainerUpdateType.DECREASE_RESOURCE;
}
entry.getValue().getValue().getContainerUpdateType();
int version = entry.getValue().getKey().getVersion();
updateList.add(
UpdateContainerRequest.newInstance(version, entry.getKey(),
updateType, targetCapability, null));
updateType, targetCapability, targetExecType));
}
return updateList;
}
@ -592,21 +592,47 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
@Override
public synchronized void requestContainerResourceChange(
Container container, Resource capability) {
validateContainerResourceChangeRequest(
container.getId(), container.getResource(), capability);
public synchronized void requestContainerUpdate(
Container container, UpdateContainerRequest updateContainerRequest) {
Preconditions.checkNotNull(container, "Container cannot be null!!");
Preconditions.checkNotNull(updateContainerRequest,
"UpdateContainerRequest cannot be null!!");
LOG.info("Requesting Container update : " +
"container=" + container + ", " +
"updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
"targetCapability=" + updateContainerRequest.getCapability() + ", " +
"targetExecType=" + updateContainerRequest.getExecutionType());
if (updateContainerRequest.getCapability() != null &&
updateContainerRequest.getExecutionType() == null) {
validateContainerResourceChangeRequest(
updateContainerRequest.getContainerUpdateType(),
container.getId(), container.getResource(),
updateContainerRequest.getCapability());
} else if (updateContainerRequest.getExecutionType() != null &&
updateContainerRequest.getCapability() == null) {
validateContainerExecTypeChangeRequest(
updateContainerRequest.getContainerUpdateType(),
container.getId(), container.getExecutionType(),
updateContainerRequest.getExecutionType());
} else if (updateContainerRequest.getExecutionType() == null &&
updateContainerRequest.getCapability() == null) {
throw new IllegalArgumentException("Both target Capability and" +
"target Execution Type are null");
} else {
throw new IllegalArgumentException("Support currently exists only for" +
" EITHER update of Capability OR update of Execution Type NOT both");
}
if (change.get(container.getId()) == null) {
change.put(container.getId(),
new SimpleEntry<>(container, capability));
new SimpleEntry<>(container, updateContainerRequest));
} else {
change.get(container.getId()).setValue(capability);
change.get(container.getId()).setValue(updateContainerRequest);
}
if (pendingChange.get(container.getId()) == null) {
pendingChange.put(container.getId(),
new SimpleEntry<>(container, capability));
new SimpleEntry<>(container, updateContainerRequest));
} else {
pendingChange.get(container.getId()).setValue(capability);
pendingChange.get(container.getId()).setValue(updateContainerRequest);
}
}
@ -751,7 +777,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
private void validateContainerResourceChangeRequest(
ContainerId containerId, Resource original, Resource target) {
ContainerUpdateType updateType, ContainerId containerId,
Resource original, Resource target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
@ -764,6 +791,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
&& Resources.fitsIn(Resources.none(), target),
"Target resource capability must be greater than 0");
if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
Preconditions.checkArgument(Resources.fitsIn(target, original),
"Target resource capability must fit in Original capability");
} else {
Preconditions.checkArgument(Resources.fitsIn(original, target),
"Target resource capability must be more than Original capability");
}
}
private void validateContainerExecTypeChangeRequest(
ContainerUpdateType updateType, ContainerId containerId,
ExecutionType original, ExecutionType target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
"Original Execution Type cannot be null");
Preconditions.checkArgument(target != null,
"Target Execution Type cannot be null");
if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
&& original == ExecutionType.GUARANTEED,
"Incorrect Container update request, target should be" +
" OPPORTUNISTIC and original should be GUARANTEED");
} else {
Preconditions.checkArgument(target == ExecutionType.GUARANTEED
&& original == ExecutionType.OPPORTUNISTIC,
"Incorrect Container update request, target should be" +
" GUARANTEED and original should be OPPORTUNISTIC");
}
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {

View File

@ -52,29 +52,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -1098,26 +1076,36 @@ public class TestAMRMClient {
(AMRMClientImpl<ContainerRequest>) amClient;
Assert.assertEquals(0, amClientImpl.change.size());
// verify newer request overwrites older request for the container1
amClientImpl.requestContainerResourceChange(
container1, Resource.newInstance(2048, 1));
amClientImpl.requestContainerResourceChange(
container1, Resource.newInstance(4096, 1));
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(4096, 1), null));
Assert.assertEquals(Resource.newInstance(4096, 1),
amClientImpl.change.get(container1.getId()).getValue());
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// verify new decrease request cancels old increase request for container1
amClientImpl.requestContainerResourceChange(
container1, Resource.newInstance(512, 1));
amClientImpl.requestContainerUpdate(container1,
UpdateContainerRequest.newInstance(container1.getVersion(),
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
Resource.newInstance(512, 1), null));
Assert.assertEquals(Resource.newInstance(512, 1),
amClientImpl.change.get(container1.getId()).getValue());
amClientImpl.change.get(container1.getId()).getValue().getCapability());
// request resource increase for container2
amClientImpl.requestContainerResourceChange(
container2, Resource.newInstance(2048, 1));
amClientImpl.requestContainerUpdate(container2,
UpdateContainerRequest.newInstance(container2.getVersion(),
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
Assert.assertEquals(Resource.newInstance(2048, 1),
amClientImpl.change.get(container2.getId()).getValue());
amClientImpl.change.get(container2.getId()).getValue().getCapability());
// verify release request will cancel pending change requests for the same
// container
amClientImpl.requestContainerResourceChange(
container3, Resource.newInstance(2048, 1));
amClientImpl.requestContainerUpdate(container3,
UpdateContainerRequest.newInstance(container3.getVersion(),
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
Assert.assertEquals(3, amClientImpl.pendingChange.size());
amClientImpl.releaseAssignedContainer(container3.getId());
Assert.assertEquals(2, amClientImpl.pendingChange.size());

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -229,8 +230,11 @@ public class TestAMRMClientOnRMRestart {
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
containerId.getContainerId(), ContainerState.RUNNING);
rm1.drainEvents();
amClient.requestContainerResourceChange(
container, Resource.newInstance(2048, 1));
amClient.requestContainerUpdate(
container, UpdateContainerRequest.newInstance(
container.getVersion(), container.getId(),
ContainerUpdateType.INCREASE_RESOURCE,
Resource.newInstance(2048, 1), null));
it.remove();
allocateResponse = amClient.allocate(0.3f);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -44,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@ -66,13 +72,17 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Class that tests the allocation of OPPORTUNISTIC containers through the
@ -83,7 +93,6 @@ public class TestOpportunisticContainerAllocation {
private static MiniYARNCluster yarnCluster = null;
private static YarnClient yarnClient = null;
private static List<NodeReport> nodeReports = null;
private static ApplicationAttemptId attemptId = null;
private static int nodeCount = 3;
private static final int ROLLING_INTERVAL_SEC = 13;
@ -92,12 +101,22 @@ public class TestOpportunisticContainerAllocation {
private static Resource capability;
private static Priority priority;
private static Priority priority2;
private static Priority priority3;
private static Priority priority4;
private static String node;
private static String rack;
private static String[] nodes;
private static String[] racks;
private final static int DEFAULT_ITERATION = 3;
// Per test..
private ApplicationAttemptId attemptId = null;
private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
private long availMB;
private int availVCores;
private long allocMB;
private int allocVCores;
@BeforeClass
public static void setup() throws Exception {
// start minicluster
@ -106,7 +125,7 @@ public class TestOpportunisticContainerAllocation {
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
ROLLING_INTERVAL_SEC);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setBoolean(
@ -129,7 +148,9 @@ public class TestOpportunisticContainerAllocation {
priority = Priority.newInstance(1);
priority2 = Priority.newInstance(2);
capability = Resource.newInstance(1024, 1);
priority3 = Priority.newInstance(3);
priority4 = Priority.newInstance(4);
capability = Resource.newInstance(512, 1);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
@ -193,10 +214,35 @@ public class TestOpportunisticContainerAllocation {
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
appAttempt.getAMRMToken()
.setService(ClientRMProxy.getAMRMTokenService(conf));
// start am rm client
amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
.createAMRMClient();
//setting an instance NMTokenCache
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
}
@After
public void cancelApp() throws YarnException, IOException {
try {
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.stop();
}
}
yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null;
}
@ -214,43 +260,254 @@ public class TestOpportunisticContainerAllocation {
}
@Test(timeout = 60000)
public void testAMRMClient() throws YarnException, IOException {
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
try {
// start am rm client
amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
public void testPromotionFromAcquired() throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
//setting an instance NMTokenCache
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.init(conf);
amClient.start();
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
amClient.registerApplicationMaster("Host", 10000, "");
assertEquals(1, oppContainersRequestedAny);
testOpportunisticAllocation(
(AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
assertEquals(1, amClient.ask.size());
assertEquals(0, amClient.release.size());
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
int iterationsLeft = 50;
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.stop();
updateMetrics("Before Opp Allocation");
while (allocatedContainerCount < oppContainersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount +=
allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainers.put(container.getId(), container);
removeCR(container);
}
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < oppContainersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
updateMetrics("After Opp Allocation / Before Promotion");
try {
Container c = allocatedOpportContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
Assert.fail("Should throw Exception..");
} catch (IllegalArgumentException e) {
System.out.println("## " + e.getMessage());
Assert.assertTrue(e.getMessage().contains(
"target should be GUARANTEED and original should be OPPORTUNISTIC"));
}
Container c = allocatedOpportContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
iterationsLeft = 120;
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
// do a few iterations to ensure RM is not going to send new containers
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
if (allocResponse.getUpdatedContainers() != null) {
for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) {
System.out.println("Got update..");
updatedContainers.put(updatedContainer.getContainer().getId(),
updatedContainer);
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
updateMetrics("After Promotion");
assertEquals(1, updatedContainers.size());
for (ContainerId cId : allocatedOpportContainers.keySet()) {
Container orig = allocatedOpportContainers.get(cId);
UpdatedContainer updatedContainer = updatedContainers.get(cId);
assertNotNull(updatedContainer);
assertEquals(ExecutionType.GUARANTEED,
updatedContainer.getContainer().getExecutionType());
assertEquals(orig.getResource(),
updatedContainer.getContainer().getResource());
assertEquals(orig.getNodeId(),
updatedContainer.getContainer().getNodeId());
assertEquals(orig.getVersion() + 1,
updatedContainer.getContainer().getVersion());
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.ask.clear();
}
private void testAllocation(
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
throws YarnException, IOException {
@Test(timeout = 60000)
public void testDemotionFromAcquired() throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority3));
int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(1, guarContainersRequestedAny);
assertEquals(1, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
int iterationsLeft = 50;
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
updateMetrics("Before Guar Allocation");
while (allocatedContainerCount < guarContainersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount +=
allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
allocatedGuarContainers.put(container.getId(), container);
removeCR(container);
}
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < guarContainersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(guarContainersRequestedAny, allocatedContainerCount);
assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
updateMetrics("After Guar Allocation / Before Demotion");
try {
Container c = allocatedGuarContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED));
Assert.fail("Should throw Exception..");
} catch (IllegalArgumentException e) {
System.out.println("## " + e.getMessage());
Assert.assertTrue(e.getMessage().contains(
"target should be OPPORTUNISTIC and original should be GUARANTEED"));
}
Container c = allocatedGuarContainers.values().iterator().next();
amClient.requestContainerUpdate(
c, UpdateContainerRequest.newInstance(c.getVersion(),
c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC));
iterationsLeft = 120;
Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
// do a few iterations to ensure RM is not going to send new containers
while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
if (allocResponse.getUpdatedContainers() != null) {
for (UpdatedContainer updatedContainer : allocResponse
.getUpdatedContainers()) {
System.out.println("Got update..");
updatedContainers.put(updatedContainer.getContainer().getId(),
updatedContainer);
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
updateMetrics("After Demotion");
assertEquals(1, updatedContainers.size());
for (ContainerId cId : allocatedGuarContainers.keySet()) {
Container orig = allocatedGuarContainers.get(cId);
UpdatedContainer updatedContainer = updatedContainers.get(cId);
assertNotNull(updatedContainer);
assertEquals(ExecutionType.OPPORTUNISTIC,
updatedContainer.getContainer().getExecutionType());
assertEquals(orig.getResource(),
updatedContainer.getContainer().getResource());
assertEquals(orig.getNodeId(),
updatedContainer.getContainer().getNodeId());
assertEquals(orig.getVersion() + 1,
updatedContainer.getContainer().getVersion());
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.ask.clear();
}
@Test(timeout = 60000)
public void testMixedAllocationAndRelease() throws YarnException,
IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
@ -274,16 +531,6 @@ public class TestOpportunisticContainerAllocation {
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
@ -298,6 +545,38 @@ public class TestOpportunisticContainerAllocation {
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(4, containersRequestedNode);
assertEquals(4, containersRequestedRack);
assertEquals(4, containersRequestedAny);
assertEquals(2, oppContainersRequestedAny);
assertEquals(4, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
@ -309,7 +588,7 @@ public class TestOpportunisticContainerAllocation {
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int allocatedOpportContainerCount = 0;
int iterationsLeft = 10;
int iterationsLeft = 50;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
@ -324,8 +603,8 @@ public class TestOpportunisticContainerAllocation {
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount += allocResponse.getAllocatedContainers()
.size();
allocatedContainerCount +=
allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainerCount++;
@ -345,9 +624,9 @@ public class TestOpportunisticContainerAllocation {
}
}
assertEquals(allocatedContainerCount,
containersRequestedAny + oppContainersRequestedAny);
assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
assertEquals(containersRequestedAny + oppContainersRequestedAny,
allocatedContainerCount);
assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
@ -395,26 +674,25 @@ public class TestOpportunisticContainerAllocation {
/**
* Tests allocation with requests comprising only opportunistic containers.
*/
private void testOpportunisticAllocation(
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
throws YarnException, IOException {
@Test(timeout = 60000)
public void testOpportunisticAllocation() throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
amClient.getTable(0).get(priority, ResourceRequest.ANY,
amClient.getTable(0).get(priority3, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@ -456,9 +734,43 @@ public class TestOpportunisticContainerAllocation {
}
}
assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(1, receivedNMTokens.values().size());
}
private void removeCR(Container container) {
List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequests = amClient.getMatchingRequests(container
.getPriority(),
ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
container.getResource());
Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
for (AMRMClient.ContainerRequest cr : rc) {
toRemove.add(cr);
}
}
for (AMRMClient.ContainerRequest cr : toRemove) {
amClient.removeContainerRequest(cr);
}
}
private void updateMetrics(String msg) {
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler)yarnCluster.getResourceManager()
.getResourceScheduler();
availMB = scheduler.getRootQueueMetrics().getAvailableMB();
availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
System.out.println("## METRICS (" + msg + ")==>");
System.out.println(" : availMB=" + availMB + ", " +
"availVCores=" +availVCores + ", " +
"allocMB=" + allocMB + ", " +
"allocVCores=" + allocVCores + ", ");
System.out.println("<== ##");
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);

View File

@ -73,6 +73,22 @@ public class UpdateContainerErrorPBImpl extends UpdateContainerError {
this.reason = reason;
}
@Override
public int getCurrentContainerVersion() {
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasCurrentContainerVersion()) {
return 0;
}
return p.getCurrentContainerVersion();
}
@Override
public void setCurrentContainerVersion(int containerVersion) {
maybeInitBuilder();
builder.setCurrentContainerVersion(containerVersion);
}
@Override
public UpdateContainerRequest getUpdateContainerRequest() {
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto

View File

@ -183,19 +183,25 @@ public class RMServerUtils {
}
}
}
checkAndcreateUpdateError(updateErrors, updateReq, msg);
checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
}
return updateRequests;
}
private static void checkAndcreateUpdateError(
List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
String msg) {
RMContainer rmContainer, String msg) {
if (msg != null) {
UpdateContainerError updateError = RECORD_FACTORY
.newRecordInstance(UpdateContainerError.class);
updateError.setReason(msg);
updateError.setUpdateContainerRequest(updateReq);
if (rmContainer != null) {
updateError.setCurrentContainerVersion(
rmContainer.getContainer().getVersion());
} else {
updateError.setCurrentContainerVersion(-1);
}
errors.add(updateError);
}
}
@ -211,9 +217,7 @@ public class RMServerUtils {
// version
if (msg == null && updateReq.getContainerVersion() !=
rmContainer.getContainer().getVersion()) {
msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
+ updateReq.getContainerVersion() + "|"
+ rmContainer.getContainer().getVersion();
msg = INCORRECT_CONTAINER_VERSION_ERROR;
}
// No more than 1 container update per request.
if (msg == null &&

View File

@ -251,8 +251,11 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
allocateResponse.getUpdateErrors().get(0).getReason());
Assert.assertEquals(0,
allocateResponse.getUpdateErrors().get(0)
.getCurrentContainerVersion());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());

View File

@ -276,8 +276,10 @@ public class TestIncreaseAllocationExpirer {
Resources.createResource(5 * GB), null)));
List<UpdateContainerError> updateErrors = response.getUpdateErrors();
Assert.assertEquals(1, updateErrors.size());
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1",
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
updateErrors.get(0).getReason());
Assert.assertEquals(1,
updateErrors.get(0).getCurrentContainerVersion());
// am1 asks to change containerId2 from 3GB to 5GB
am1.sendContainerResizingRequest(Collections.singletonList(