YARN-1509. Make AMRMClient support send increase container request and get increased/decreased containers. (Meng Ding via wangda)
This commit is contained in:
parent
7b00c8e20e
commit
7ff280fca9
|
@ -247,6 +247,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-1510. Make NMClient support change container resources.
|
||||
(Meng Ding via wangda)
|
||||
|
||||
YARN-1509. Make AMRMClient support send increase container request and
|
||||
get increased/decreased containers. (Meng Ding via wangda)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -556,7 +556,8 @@ public class ApplicationMaster {
|
|||
appSubmitterUgi.addCredentials(credentials);
|
||||
|
||||
|
||||
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
||||
AMRMClientAsync.AbstractCallbackHandler allocListener =
|
||||
new RMCallbackHandler();
|
||||
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
||||
amRMClient.init(conf);
|
||||
amRMClient.start();
|
||||
|
@ -731,7 +732,7 @@ public class ApplicationMaster {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
||||
|
@ -834,6 +835,9 @@ public class ApplicationMaster {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersResourceChanged(List<Container> containers) {}
|
||||
|
||||
@Override
|
||||
public void onShutdownRequest() {
|
||||
done = true;
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -285,7 +287,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
* @param req Resource request
|
||||
*/
|
||||
public abstract void addContainerRequest(T req);
|
||||
|
||||
|
||||
/**
|
||||
* Remove previous container request. The previous container request may have
|
||||
* already been sent to the ResourceManager. So even after the remove request
|
||||
|
@ -294,7 +296,26 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
* @param req Resource request
|
||||
*/
|
||||
public abstract void removeContainerRequest(T req);
|
||||
|
||||
|
||||
/**
|
||||
* Request container resource change before calling <code>allocate</code>.
|
||||
* Any previous pending resource change request of the same container will be
|
||||
* removed.
|
||||
*
|
||||
* Application that calls this method is expected to maintain the
|
||||
* <code>Container</code>s that are returned from previous successful
|
||||
* allocations or resource changes. By passing in the existing container and a
|
||||
* target resource capability to this method, the application requests the
|
||||
* ResourceManager to change the existing resource allocation to the target
|
||||
* resource allocation.
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or resource change
|
||||
* @param capability The target resource capability of the container
|
||||
*/
|
||||
public abstract void requestContainerResourceChange(
|
||||
Container container, Resource capability);
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
* the container or wants to give up the container then it can release them.
|
||||
|
|
|
@ -56,11 +56,17 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* It should be used by implementing a CallbackHandler:
|
||||
* <pre>
|
||||
* {@code
|
||||
* class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
* class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
|
||||
* public void onContainersAllocated(List<Container> containers) {
|
||||
* [run tasks on the containers]
|
||||
* }
|
||||
*
|
||||
*
|
||||
* public void onContainersResourceChanged(List<Container> containers) {
|
||||
* [determine if resource allocation of containers have been increased in
|
||||
* the ResourceManager, and if so, inform the NodeManagers to increase the
|
||||
* resource monitor/enforcement on the containers]
|
||||
* }
|
||||
*
|
||||
* public void onContainersCompleted(List<ContainerStatus> statuses) {
|
||||
* [update progress, check whether app is done]
|
||||
* }
|
||||
|
@ -100,23 +106,80 @@ extends AbstractService {
|
|||
protected final CallbackHandler handler;
|
||||
protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
|
||||
|
||||
/**
|
||||
* <p>Create a new instance of AMRMClientAsync.</p>
|
||||
*
|
||||
* @param intervalMs heartbeat interval in milliseconds between AM and RM
|
||||
* @param callbackHandler callback handler that processes responses from
|
||||
* the <code>ResourceManager</code>
|
||||
*/
|
||||
public static <T extends ContainerRequest> AMRMClientAsync<T>
|
||||
createAMRMClientAsync(
|
||||
int intervalMs, AbstractCallbackHandler callbackHandler) {
|
||||
return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Create a new instance of AMRMClientAsync.</p>
|
||||
*
|
||||
* @param client the AMRMClient instance
|
||||
* @param intervalMs heartbeat interval in milliseconds between AM and RM
|
||||
* @param callbackHandler callback handler that processes responses from
|
||||
* the <code>ResourceManager</code>
|
||||
*/
|
||||
public static <T extends ContainerRequest> AMRMClientAsync<T>
|
||||
createAMRMClientAsync(
|
||||
AMRMClient<T> client, int intervalMs,
|
||||
AbstractCallbackHandler callbackHandler) {
|
||||
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
protected AMRMClientAsync(
|
||||
int intervalMs, AbstractCallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
|
||||
AbstractCallbackHandler callbackHandler) {
|
||||
super(AMRMClientAsync.class.getName());
|
||||
this.client = client;
|
||||
this.heartbeatIntervalMs.set(intervalMs);
|
||||
this.handler = callbackHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @deprecated Use {@link #createAMRMClientAsync(int,
|
||||
* AMRMClientAsync.AbstractCallbackHandler)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T extends ContainerRequest> AMRMClientAsync<T>
|
||||
createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
|
||||
return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @deprecated Use {@link #createAMRMClientAsync(AMRMClient,
|
||||
* int, AMRMClientAsync.AbstractCallbackHandler)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T extends ContainerRequest> AMRMClientAsync<T>
|
||||
createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
|
||||
CallbackHandler callbackHandler) {
|
||||
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
@Deprecated
|
||||
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
|
||||
CallbackHandler callbackHandler) {
|
||||
super(AMRMClientAsync.class.getName());
|
||||
|
@ -171,6 +234,25 @@ extends AbstractService {
|
|||
*/
|
||||
public abstract void removeContainerRequest(T req);
|
||||
|
||||
/**
|
||||
* Request container resource change before calling <code>allocate</code>.
|
||||
* Any previous pending resource change request of the same container will be
|
||||
* removed.
|
||||
*
|
||||
* Application that calls this method is expected to maintain the
|
||||
* <code>Container</code>s that are returned from previous successful
|
||||
* allocations or resource changes. By passing in the existing container and a
|
||||
* target resource capability to this method, the application requests the
|
||||
* ResourceManager to change the existing resource allocation to the target
|
||||
* resource allocation.
|
||||
*
|
||||
* @param container The container returned from the last successful resource
|
||||
* allocation or resource change
|
||||
* @param capability The target resource capability of the container
|
||||
*/
|
||||
public abstract void requestContainerResourceChange(
|
||||
Container container, Resource capability);
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
* the container or wants to give up the container then it can release them.
|
||||
|
@ -264,37 +346,95 @@ extends AbstractService {
|
|||
} while (true);
|
||||
}
|
||||
|
||||
public interface CallbackHandler {
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The callback abstract class. The callback functions need to be implemented
|
||||
* by {@link AMRMClientAsync} users. The APIs are called when responses from
|
||||
* the <code>ResourceManager</code> are available.
|
||||
* </p>
|
||||
*/
|
||||
public abstract static class AbstractCallbackHandler
|
||||
implements CallbackHandler {
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with completed
|
||||
* containers. If the response contains both completed containers and
|
||||
* allocated containers, this will be called before containersAllocated.
|
||||
*/
|
||||
public void onContainersCompleted(List<ContainerStatus> statuses);
|
||||
|
||||
public abstract void onContainersCompleted(List<ContainerStatus> statuses);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with allocated
|
||||
* containers. If the response containers both completed containers and
|
||||
* allocated containers, this will be called after containersCompleted.
|
||||
*/
|
||||
public void onContainersAllocated(List<Container> containers);
|
||||
|
||||
public abstract void onContainersAllocated(List<Container> containers);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with containers
|
||||
* whose resource allocation has been changed.
|
||||
*/
|
||||
public abstract void onContainersResourceChanged(
|
||||
List<Container> containers);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager wants the ApplicationMaster to shutdown
|
||||
* for being out of sync etc. The ApplicationMaster should not unregister
|
||||
* with the RM unless the ApplicationMaster wants to be the last attempt.
|
||||
*/
|
||||
public void onShutdownRequest();
|
||||
|
||||
public abstract void onShutdownRequest();
|
||||
|
||||
/**
|
||||
* Called when nodes tracked by the ResourceManager have changed in health,
|
||||
* availability etc.
|
||||
*/
|
||||
public void onNodesUpdated(List<NodeReport> updatedNodes);
|
||||
|
||||
public float getProgress();
|
||||
|
||||
public abstract void onNodesUpdated(List<NodeReport> updatedNodes);
|
||||
|
||||
public abstract float getProgress();
|
||||
|
||||
/**
|
||||
* Called when error comes from RM communications as well as from errors in
|
||||
* the callback itself from the app. Calling
|
||||
* stop() is the recommended action.
|
||||
*/
|
||||
public abstract void onError(Throwable e);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public interface CallbackHandler {
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with completed
|
||||
* containers. If the response contains both completed containers and
|
||||
* allocated containers, this will be called before containersAllocated.
|
||||
*/
|
||||
void onContainersCompleted(List<ContainerStatus> statuses);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with allocated
|
||||
* containers. If the response containers both completed containers and
|
||||
* allocated containers, this will be called after containersCompleted.
|
||||
*/
|
||||
void onContainersAllocated(List<Container> containers);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager wants the ApplicationMaster to shutdown
|
||||
* for being out of sync etc. The ApplicationMaster should not unregister
|
||||
* with the RM unless the ApplicationMaster wants to be the last attempt.
|
||||
*/
|
||||
void onShutdownRequest();
|
||||
|
||||
/**
|
||||
* Called when nodes tracked by the ResourceManager have changed in health,
|
||||
* availability etc.
|
||||
*/
|
||||
void onNodesUpdated(List<NodeReport> updatedNodes);
|
||||
|
||||
float getProgress();
|
||||
|
||||
/**
|
||||
* Called when error comes from RM communications as well as from errors in
|
||||
* the callback itself from the app. Calling
|
||||
|
@ -302,6 +442,6 @@ extends AbstractService {
|
|||
*
|
||||
* @param e
|
||||
*/
|
||||
public void onError(Throwable e);
|
||||
void onError(Throwable e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.client.api.async.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -66,13 +67,41 @@ extends AMRMClientAsync<T> {
|
|||
private volatile float progress;
|
||||
|
||||
private volatile Throwable savedException;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param intervalMs heartbeat interval in milliseconds between AM and RM
|
||||
* @param callbackHandler callback handler that processes responses from
|
||||
* the <code>ResourceManager</code>
|
||||
*/
|
||||
public AMRMClientAsyncImpl(
|
||||
int intervalMs, AbstractCallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
|
||||
AbstractCallbackHandler callbackHandler) {
|
||||
super(client, intervalMs, callbackHandler);
|
||||
heartbeatThread = new HeartbeatThread();
|
||||
handlerThread = new CallbackHandlerThread();
|
||||
responseQueue = new LinkedBlockingQueue<>();
|
||||
keepRunning = true;
|
||||
savedException = null;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @deprecated Use {@link #AMRMClientAsyncImpl(int,
|
||||
* AMRMClientAsync.AbstractCallbackHandler)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
@Deprecated
|
||||
public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
|
||||
CallbackHandler callbackHandler) {
|
||||
super(client, intervalMs, callbackHandler);
|
||||
|
@ -82,7 +111,7 @@ extends AMRMClientAsync<T> {
|
|||
keepRunning = true;
|
||||
savedException = null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
|
@ -177,6 +206,12 @@ extends AMRMClientAsync<T> {
|
|||
client.removeContainerRequest(req);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
client.requestContainerResourceChange(container, capability);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
* the container or wants to give up the container then it can release them.
|
||||
|
@ -300,6 +335,18 @@ extends AMRMClientAsync<T> {
|
|||
handler.onContainersCompleted(completed);
|
||||
}
|
||||
|
||||
if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
|
||||
// RM side of the implementation guarantees that there are
|
||||
// no duplications between increased and decreased containers
|
||||
List<Container> changed = new ArrayList<>();
|
||||
changed.addAll(response.getIncreasedContainers());
|
||||
changed.addAll(response.getDecreasedContainers());
|
||||
if (!changed.isEmpty()) {
|
||||
((AMRMClientAsync.AbstractCallbackHandler) handler)
|
||||
.onContainersResourceChanged(changed);
|
||||
}
|
||||
}
|
||||
|
||||
List<Container> allocated = response.getAllocatedContainers();
|
||||
if (!allocated.isEmpty()) {
|
||||
handler.onContainersAllocated(allocated);
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Set;
|
|||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,7 +50,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
|
@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
|
@ -110,8 +114,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
containerRequests = new LinkedHashSet<T>();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Class compares Resource by memory then cpu in reverse order
|
||||
*/
|
||||
|
@ -144,10 +148,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
int cpu0 = arg0.getVirtualCores();
|
||||
int cpu1 = arg1.getVirtualCores();
|
||||
|
||||
if(mem0 <= mem1 && cpu0 <= cpu1) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return (mem0 <= mem1 && cpu0 <= cpu1);
|
||||
}
|
||||
|
||||
//Key -> Priority
|
||||
|
@ -164,11 +165,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
||||
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
|
||||
protected final Set<ContainerId> release = new TreeSet<ContainerId>();
|
||||
// pendingRelease holds history or release requests.request is removed only if
|
||||
// RM sends completedContainer.
|
||||
// pendingRelease holds history of release requests.
|
||||
// request is removed only if RM sends completedContainer.
|
||||
// How it different from release? --> release is for per allocate() request.
|
||||
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
|
||||
|
||||
// change map holds container resource change requests between two allocate()
|
||||
// calls, and are cleared after each successful allocate() call.
|
||||
protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
|
||||
new HashMap<>();
|
||||
// pendingChange map holds history of container resource change requests in
|
||||
// case AM needs to reregister with the ResourceManager.
|
||||
// Change requests are removed from this map if RM confirms the change
|
||||
// through allocate response, or if RM confirms that the container has been
|
||||
// completed.
|
||||
protected final Map<ContainerId, SimpleEntry<Container, Resource>>
|
||||
pendingChange = new HashMap<>();
|
||||
|
||||
public AMRMClientImpl() {
|
||||
super(AMRMClientImpl.class.getName());
|
||||
}
|
||||
|
@ -241,7 +253,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
AllocateRequest allocateRequest = null;
|
||||
List<String> blacklistToAdd = new ArrayList<String>();
|
||||
List<String> blacklistToRemove = new ArrayList<String>();
|
||||
|
||||
Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
|
||||
new HashMap<>();
|
||||
try {
|
||||
synchronized (this) {
|
||||
askList = new ArrayList<ResourceRequest>(ask.size());
|
||||
|
@ -252,10 +265,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
r.getResourceName(), r.getCapability(), r.getNumContainers(),
|
||||
r.getRelaxLocality(), r.getNodeLabelExpression()));
|
||||
}
|
||||
List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
|
||||
List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
|
||||
// Save the current change for recovery
|
||||
oldChange.putAll(change);
|
||||
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
|
||||
change.entrySet()) {
|
||||
Container container = entry.getValue().getKey();
|
||||
Resource original = container.getResource();
|
||||
Resource target = entry.getValue().getValue();
|
||||
if (Resources.fitsIn(target, original)) {
|
||||
// This is a decrease request
|
||||
decreaseList.add(ContainerResourceChangeRequest.newInstance(
|
||||
container.getId(), target));
|
||||
} else {
|
||||
// This is an increase request
|
||||
increaseList.add(ContainerResourceChangeRequest.newInstance(
|
||||
container.getId(), target));
|
||||
}
|
||||
}
|
||||
releaseList = new ArrayList<ContainerId>(release);
|
||||
// optimistically clear this collection assuming no RPC failure
|
||||
ask.clear();
|
||||
release.clear();
|
||||
change.clear();
|
||||
|
||||
blacklistToAdd.addAll(blacklistAdditions);
|
||||
blacklistToRemove.addAll(blacklistRemovals);
|
||||
|
@ -266,8 +299,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
|
||||
allocateRequest =
|
||||
AllocateRequest.newInstance(lastResponseId, progressIndicator,
|
||||
askList, releaseList, blacklistRequest);
|
||||
// clear blacklistAdditions and blacklistRemovals before
|
||||
askList, releaseList, blacklistRequest,
|
||||
increaseList, decreaseList);
|
||||
// clear blacklistAdditions and blacklistRemovals before
|
||||
// unsynchronized part
|
||||
blacklistAdditions.clear();
|
||||
blacklistRemovals.clear();
|
||||
|
@ -289,6 +323,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
change.putAll(this.pendingChange);
|
||||
}
|
||||
// re register with RM
|
||||
registerApplicationMaster();
|
||||
|
@ -312,6 +347,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
removePendingReleaseRequests(allocateResponse
|
||||
.getCompletedContainersStatuses());
|
||||
}
|
||||
if (!pendingChange.isEmpty()) {
|
||||
List<ContainerStatus> completed =
|
||||
allocateResponse.getCompletedContainersStatuses();
|
||||
List<Container> changed = new ArrayList<>();
|
||||
changed.addAll(allocateResponse.getIncreasedContainers());
|
||||
changed.addAll(allocateResponse.getDecreasedContainers());
|
||||
// remove all pending change requests that belong to the completed
|
||||
// containers
|
||||
for (ContainerStatus status : completed) {
|
||||
ContainerId containerId = status.getContainerId();
|
||||
pendingChange.remove(containerId);
|
||||
}
|
||||
// remove all pending change requests that have been satisfied
|
||||
if (!changed.isEmpty()) {
|
||||
removePendingChangeRequests(changed);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// TODO how to differentiate remote yarn exception vs error in rpc
|
||||
|
@ -333,7 +385,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
ask.add(oldAsk);
|
||||
}
|
||||
}
|
||||
|
||||
// change requests could have been added during the allocate call.
|
||||
// Those are the newest requests which take precedence
|
||||
// over requests cached in the oldChange map.
|
||||
//
|
||||
// Only insert entries from the cached oldChange map
|
||||
// that do not exist in the current change map:
|
||||
for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
|
||||
oldChange.entrySet()) {
|
||||
ContainerId oldContainerId = entry.getKey();
|
||||
Container oldContainer = entry.getValue().getKey();
|
||||
Resource oldResource = entry.getValue().getValue();
|
||||
if (change.get(oldContainerId) == null) {
|
||||
change.put(
|
||||
oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
|
||||
}
|
||||
}
|
||||
blacklistAdditions.addAll(blacklistToAdd);
|
||||
blacklistRemovals.addAll(blacklistToRemove);
|
||||
}
|
||||
|
@ -349,6 +416,24 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
}
|
||||
|
||||
protected void removePendingChangeRequests(
|
||||
List<Container> changedContainers) {
|
||||
for (Container changedContainer : changedContainers) {
|
||||
ContainerId containerId = changedContainer.getId();
|
||||
if (pendingChange.get(containerId) == null) {
|
||||
continue;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RM has confirmed changed resource allocation for "
|
||||
+ "container " + containerId + ". Current resource allocation:"
|
||||
+ changedContainer.getResource()
|
||||
+ ". Remove pending change request:"
|
||||
+ pendingChange.get(containerId).getValue());
|
||||
}
|
||||
pendingChange.remove(containerId);
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected void populateNMTokens(List<NMToken> nmTokens) {
|
||||
|
@ -479,12 +564,32 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
req.getCapability(), req);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void requestContainerResourceChange(
|
||||
Container container, Resource capability) {
|
||||
validateContainerResourceChangeRequest(
|
||||
container.getId(), container.getResource(), capability);
|
||||
if (change.get(container.getId()) == null) {
|
||||
change.put(container.getId(),
|
||||
new SimpleEntry<>(container, capability));
|
||||
} else {
|
||||
change.get(container.getId()).setValue(capability);
|
||||
}
|
||||
if (pendingChange.get(container.getId()) == null) {
|
||||
pendingChange.put(container.getId(),
|
||||
new SimpleEntry<>(container, capability));
|
||||
} else {
|
||||
pendingChange.get(container.getId()).setValue(capability);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void releaseAssignedContainer(ContainerId containerId) {
|
||||
Preconditions.checkArgument(containerId != null,
|
||||
"ContainerId can not be null.");
|
||||
pendingRelease.add(containerId);
|
||||
release.add(containerId);
|
||||
pendingChange.remove(containerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -618,7 +723,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
"Cannot specify node label with rack and node");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void validateContainerResourceChangeRequest(
|
||||
ContainerId containerId, Resource original, Resource target) {
|
||||
Preconditions.checkArgument(containerId != null,
|
||||
"ContainerId cannot be null");
|
||||
Preconditions.checkArgument(original != null,
|
||||
"Original resource capability cannot be null");
|
||||
Preconditions.checkArgument(!Resources.equals(Resources.none(), original)
|
||||
&& Resources.fitsIn(Resources.none(), original),
|
||||
"Original resource capability must be greater than 0");
|
||||
Preconditions.checkArgument(target != null,
|
||||
"Target resource capability cannot be null");
|
||||
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
|
||||
&& Resources.fitsIn(Resources.none(), target),
|
||||
"Target resource capability must be greater than 0");
|
||||
}
|
||||
|
||||
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||
// This code looks weird but is needed because of the following scenario.
|
||||
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
|
||||
|
|
|
@ -74,12 +74,15 @@ public class TestAMRMClientAsync {
|
|||
List<ContainerStatus> completed1 = Arrays.asList(
|
||||
ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
|
||||
ContainerState.COMPLETE, "", 0));
|
||||
List<Container> allocated1 = Arrays.asList(
|
||||
List<Container> containers = Arrays.asList(
|
||||
Container.newInstance(null, null, null, null, null, null));
|
||||
final AllocateResponse response1 = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), allocated1, null);
|
||||
new ArrayList<ContainerStatus>(), containers, null);
|
||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||
new ArrayList<Container>(), null);
|
||||
final AllocateResponse response3 = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
|
||||
containers, containers, null);
|
||||
final AllocateResponse emptyResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||
|
||||
|
@ -91,15 +94,15 @@ public class TestAMRMClientAsync {
|
|||
public AllocateResponse answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
secondHeartbeatSync.incrementAndGet();
|
||||
while(heartbeatBlock.get()) {
|
||||
synchronized(heartbeatBlock) {
|
||||
while (heartbeatBlock.get()) {
|
||||
synchronized (heartbeatBlock) {
|
||||
heartbeatBlock.wait();
|
||||
}
|
||||
}
|
||||
secondHeartbeatSync.incrementAndGet();
|
||||
return response2;
|
||||
}
|
||||
}).thenReturn(emptyResponse);
|
||||
}).thenReturn(response3).thenReturn(emptyResponse);
|
||||
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
|
||||
.thenReturn(null);
|
||||
when(client.getAvailableResources()).thenAnswer(new Answer<Resource>() {
|
||||
|
@ -146,16 +149,22 @@ public class TestAMRMClientAsync {
|
|||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
|
||||
// wait for the completed containers from the second heartbeat's response
|
||||
while (callbackHandler.takeCompletedContainers() == null) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
|
||||
// wait for the changed containers from the thrid heartbeat's response
|
||||
while (callbackHandler.takeChangedContainers() == null) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
asyncClient.stop();
|
||||
|
||||
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
|
||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
Assert.assertEquals(null, callbackHandler.takeChangedContainers());
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
|
@ -397,6 +406,17 @@ public class TestAMRMClientAsync {
|
|||
return response;
|
||||
}
|
||||
|
||||
private AllocateResponse createAllocateResponse(
|
||||
List<ContainerStatus> completed, List<Container> allocated,
|
||||
List<Container> increased, List<Container> decreased,
|
||||
List<NMToken> nmTokens) {
|
||||
AllocateResponse response =
|
||||
AllocateResponse.newInstance(0, completed, allocated,
|
||||
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
|
||||
increased, decreased);
|
||||
return response;
|
||||
}
|
||||
|
||||
public static ContainerId newContainerId(int appId, int appAttemptId,
|
||||
long timestamp, int containerId) {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
|
||||
|
@ -405,9 +425,11 @@ public class TestAMRMClientAsync {
|
|||
return ContainerId.newContainerId(applicationAttemptId, containerId);
|
||||
}
|
||||
|
||||
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
private class TestCallbackHandler
|
||||
extends AMRMClientAsync.AbstractCallbackHandler {
|
||||
private volatile List<ContainerStatus> completedContainers;
|
||||
private volatile List<Container> allocatedContainers;
|
||||
private final List<Container> changedContainers = new ArrayList<>();
|
||||
Exception savedException = null;
|
||||
volatile boolean reboot = false;
|
||||
Object notifier = new Object();
|
||||
|
@ -425,7 +447,19 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
public List<Container> takeChangedContainers() {
|
||||
List<Container> ret = null;
|
||||
synchronized (changedContainers) {
|
||||
if (!changedContainers.isEmpty()) {
|
||||
ret = new ArrayList<>(changedContainers);
|
||||
changedContainers.clear();
|
||||
changedContainers.notify();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public List<Container> takeAllocatedContainers() {
|
||||
List<Container> ret = allocatedContainers;
|
||||
if (ret == null) {
|
||||
|
@ -453,6 +487,22 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersResourceChanged(
|
||||
List<Container> changed) {
|
||||
synchronized (changedContainers) {
|
||||
changedContainers.clear();
|
||||
changedContainers.addAll(changed);
|
||||
while (!changedContainers.isEmpty()) {
|
||||
try {
|
||||
changedContainers.wait();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Interrupted during wait", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersAllocated(List<Container> containers) {
|
||||
allocatedContainers = containers;
|
||||
|
@ -494,7 +544,8 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
}
|
||||
|
||||
private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler {
|
||||
private class TestCallbackHandler2
|
||||
extends AMRMClientAsync.AbstractCallbackHandler {
|
||||
Object notifier = new Object();
|
||||
@SuppressWarnings("rawtypes")
|
||||
AMRMClientAsync asynClient;
|
||||
|
@ -512,6 +563,9 @@ public class TestAMRMClientAsync {
|
|||
@Override
|
||||
public void onContainersAllocated(List<Container> containers) {}
|
||||
|
||||
@Override
|
||||
public void onContainersResourceChanged(List<Container> containers) {}
|
||||
|
||||
@Override
|
||||
public void onShutdownRequest() {}
|
||||
|
||||
|
|
|
@ -35,10 +35,12 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.client.ClientRMProxy;
|
|||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -126,6 +129,8 @@ public class TestAMRMClient {
|
|||
rolling_interval_sec);
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||
// set the minimum allocation so that resource decrease can go under 1024
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
||||
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
|
@ -730,8 +735,160 @@ public class TestAMRMClient {
|
|||
new ContainerRequest(Resource.newInstance(1024, 1), null, null,
|
||||
Priority.UNDEFINED, true, "x && y"));
|
||||
}
|
||||
|
||||
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testAMRMClientWithContainerResourceChange()
|
||||
throws YarnException, IOException {
|
||||
AMRMClient<ContainerRequest> amClient = null;
|
||||
try {
|
||||
// start am rm client
|
||||
amClient = AMRMClient.createAMRMClient();
|
||||
Assert.assertNotNull(amClient);
|
||||
// asserting we are using the singleton instance cache
|
||||
Assert.assertSame(
|
||||
NMTokenCache.getSingleton(), amClient.getNMTokenCache());
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
assertEquals(STATE.STARTED, amClient.getServiceState());
|
||||
// start am nm client
|
||||
NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
|
||||
Assert.assertNotNull(nmClient);
|
||||
// asserting we are using the singleton instance cache
|
||||
Assert.assertSame(
|
||||
NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
|
||||
nmClient.init(conf);
|
||||
nmClient.start();
|
||||
assertEquals(STATE.STARTED, nmClient.getServiceState());
|
||||
// am rm client register the application master with RM
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
// allocate three containers and make sure they are in RUNNING state
|
||||
List<Container> containers =
|
||||
allocateAndStartContainers(amClient, nmClient, 3);
|
||||
// perform container resource increase and decrease tests
|
||||
doContainerResourceChange(amClient, containers);
|
||||
// unregister and finish up the test
|
||||
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||
null, null);
|
||||
} finally {
|
||||
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||
amClient.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Container> allocateAndStartContainers(
|
||||
final AMRMClient<ContainerRequest> amClient, final NMClient nmClient,
|
||||
int num) throws YarnException, IOException {
|
||||
// set up allocation requests
|
||||
for (int i = 0; i < num; ++i) {
|
||||
amClient.addContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
}
|
||||
// send allocation requests
|
||||
amClient.allocate(0.1f);
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(150);
|
||||
// get allocations
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
List<Container> containers = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(num, containers.size());
|
||||
// build container launch context
|
||||
Credentials ts = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
// start a process long enough for increase/decrease action to take effect
|
||||
ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
|
||||
Collections.<String, LocalResource>emptyMap(),
|
||||
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
||||
new HashMap<String, ByteBuffer>(), securityTokens,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
// start the containers and make sure they are in RUNNING state
|
||||
try {
|
||||
for (int i = 0; i < num; i++) {
|
||||
Container container = containers.get(i);
|
||||
nmClient.startContainer(container, clc);
|
||||
// NodeManager may still need some time to get the stable
|
||||
// container status
|
||||
while (true) {
|
||||
ContainerStatus status = nmClient.getContainerStatus(
|
||||
container.getId(), container.getNodeId());
|
||||
if (status.getState() == ContainerState.RUNNING) {
|
||||
break;
|
||||
}
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
throw new AssertionError("Exception is not expected: " + e);
|
||||
}
|
||||
// sleep to let NM's heartbeat to RM to confirm container launch
|
||||
sleep(200);
|
||||
return containers;
|
||||
}
|
||||
|
||||
|
||||
private void doContainerResourceChange(
|
||||
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
|
||||
throws YarnException, IOException {
|
||||
Assert.assertEquals(3, containers.size());
|
||||
// remember the container IDs
|
||||
Container container1 = containers.get(0);
|
||||
Container container2 = containers.get(1);
|
||||
Container container3 = containers.get(2);
|
||||
AMRMClientImpl<ContainerRequest> amClientImpl =
|
||||
(AMRMClientImpl<ContainerRequest>) amClient;
|
||||
Assert.assertEquals(0, amClientImpl.change.size());
|
||||
// verify newer request overwrites older request for the container1
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(2048, 1));
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(4096, 1));
|
||||
Assert.assertEquals(Resource.newInstance(4096, 1),
|
||||
amClientImpl.change.get(container1.getId()).getValue());
|
||||
// verify new decrease request cancels old increase request for container1
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container1, Resource.newInstance(512, 1));
|
||||
Assert.assertEquals(Resource.newInstance(512, 1),
|
||||
amClientImpl.change.get(container1.getId()).getValue());
|
||||
// request resource increase for container2
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container2, Resource.newInstance(2048, 1));
|
||||
Assert.assertEquals(Resource.newInstance(2048, 1),
|
||||
amClientImpl.change.get(container2.getId()).getValue());
|
||||
// verify release request will cancel pending change requests for the same
|
||||
// container
|
||||
amClientImpl.requestContainerResourceChange(
|
||||
container3, Resource.newInstance(2048, 1));
|
||||
Assert.assertEquals(3, amClientImpl.pendingChange.size());
|
||||
amClientImpl.releaseAssignedContainer(container3.getId());
|
||||
Assert.assertEquals(2, amClientImpl.pendingChange.size());
|
||||
// as of now: container1 asks to decrease to (512, 1)
|
||||
// container2 asks to increase to (2048, 1)
|
||||
// send allocation requests
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
Assert.assertEquals(0, amClientImpl.change.size());
|
||||
// we should get decrease confirmation right away
|
||||
List<Container> decreasedContainers =
|
||||
allocResponse.getDecreasedContainers();
|
||||
List<Container> increasedContainers =
|
||||
allocResponse.getIncreasedContainers();
|
||||
Assert.assertEquals(1, decreasedContainers.size());
|
||||
Assert.assertEquals(0, increasedContainers.size());
|
||||
// we should get increase allocation after the next NM's heartbeat to RM
|
||||
sleep(150);
|
||||
// get allocations
|
||||
allocResponse = amClient.allocate(0.1f);
|
||||
decreasedContainers =
|
||||
allocResponse.getDecreasedContainers();
|
||||
increasedContainers =
|
||||
allocResponse.getIncreasedContainers();
|
||||
Assert.assertEquals(1, increasedContainers.size());
|
||||
Assert.assertEquals(0, decreasedContainers.size());
|
||||
}
|
||||
|
||||
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
throws YarnException, IOException {
|
||||
// setup container request
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -90,15 +91,17 @@ public class TestAMRMClientOnRMRestart {
|
|||
}
|
||||
|
||||
// Test does major 6 steps verification.
|
||||
// Step-1 : AMRMClient send allocate request for 2 container requests
|
||||
// Step-2 : 2 containers are allocated by RM.
|
||||
// Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to
|
||||
// Step-1 : AMRMClient send allocate request for 3 container requests
|
||||
// Step-2 : 3 containers are allocated by RM.
|
||||
// Step-3 : AM Send 1 containerRequest(cRequest4) and 1 releaseRequests to
|
||||
// RM
|
||||
// Step-3.5 : AM Send 1 container resource increase request to RM
|
||||
// Step-4 : On RM restart, AM(does not know RM is restarted) sends additional
|
||||
// containerRequest(cRequest4) and blacklisted nodes.
|
||||
// containerRequest(cRequest5) and blacklisted nodes.
|
||||
// Intern RM send resync command
|
||||
// Step-5 : Allocater after resync command & new containerRequest(cRequest5)
|
||||
// Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
|
||||
// Verify AM can recover increase request after resync
|
||||
// Step-5 : Allocater after resync command & new containerRequest(cRequest6)
|
||||
// Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6
|
||||
@Test(timeout = 60000)
|
||||
public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
|
||||
|
||||
|
@ -132,8 +135,8 @@ public class TestAMRMClientOnRMRestart {
|
|||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||
|
||||
// Step-1 : AMRMClient send allocate request for 2 ContainerRequest
|
||||
// cRequest1 = h1 and cRequest2 = h1,h2
|
||||
// Step-1 : AMRMClient send allocate request for 3 ContainerRequest
|
||||
// cRequest1 = h1, cRequest2 = h1,h2 and cRequest3 = h1
|
||||
// blacklisted nodes = h2
|
||||
AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
|
||||
amClient.init(conf);
|
||||
|
@ -148,6 +151,9 @@ public class TestAMRMClientOnRMRestart {
|
|||
createReq(1, 1024, new String[] { "h1", "h2" });
|
||||
amClient.addContainerRequest(cRequest2);
|
||||
|
||||
ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
|
||||
amClient.addContainerRequest(cRequest3);
|
||||
|
||||
List<String> blacklistAdditions = new ArrayList<String>();
|
||||
List<String> blacklistRemoval = new ArrayList<String>();
|
||||
blacklistAdditions.add("h2");
|
||||
|
@ -167,14 +173,14 @@ public class TestAMRMClientOnRMRestart {
|
|||
assertBlacklistAdditionsAndRemovals(1, 1, rm1);
|
||||
|
||||
// Step-2 : NM heart beat is sent.
|
||||
// On 2nd AM allocate request, RM allocates 2 containers to AM
|
||||
// On 2nd AM allocate request, RM allocates 3 containers to AM
|
||||
nm1.nodeHeartbeat(true); // Node heartbeat
|
||||
dispatcher.await();
|
||||
|
||||
allocateResponse = amClient.allocate(0.2f);
|
||||
dispatcher.await();
|
||||
// 2 containers are allocated i.e for cRequest1 and cRequest2.
|
||||
Assert.assertEquals("No of assignments must be 0", 2, allocateResponse
|
||||
// 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
|
||||
Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
|
||||
.getAllocatedContainers().size());
|
||||
assertAsksAndReleases(0, 0, rm1);
|
||||
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||
|
@ -184,6 +190,7 @@ public class TestAMRMClientOnRMRestart {
|
|||
// removed allocated container requests
|
||||
amClient.removeContainerRequest(cRequest1);
|
||||
amClient.removeContainerRequest(cRequest2);
|
||||
amClient.removeContainerRequest(cRequest3);
|
||||
|
||||
allocateResponse = amClient.allocate(0.2f);
|
||||
dispatcher.await();
|
||||
|
@ -193,8 +200,8 @@ public class TestAMRMClientOnRMRestart {
|
|||
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||
|
||||
// Step-3 : Send 1 containerRequest and 1 releaseRequests to RM
|
||||
ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
|
||||
amClient.addContainerRequest(cRequest3);
|
||||
ContainerRequest cRequest4 = createReq(1, 1024, new String[] { "h1" });
|
||||
amClient.addContainerRequest(cRequest4);
|
||||
|
||||
int pendingRelease = 0;
|
||||
Iterator<Container> it = allocatedContainers.iterator();
|
||||
|
@ -205,11 +212,24 @@ public class TestAMRMClientOnRMRestart {
|
|||
break;// remove one container
|
||||
}
|
||||
|
||||
// Step-3.5 : Send 1 container resource increase request to RM
|
||||
Container container = it.next();
|
||||
ContainerId containerId = container.getId();
|
||||
// Make sure that container is in RUNNING state before sending increase
|
||||
// request
|
||||
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
|
||||
containerId.getContainerId(), ContainerState.RUNNING);
|
||||
amClient.requestContainerResourceChange(
|
||||
container, Resource.newInstance(2048, 1));
|
||||
it.remove();
|
||||
|
||||
allocateResponse = amClient.allocate(0.3f);
|
||||
dispatcher.await();
|
||||
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
|
||||
.getAllocatedContainers().size());
|
||||
assertAsksAndReleases(3, pendingRelease, rm1);
|
||||
// Verify there is one increase and zero decrease
|
||||
assertChanges(1, 0, rm1);
|
||||
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||
int completedContainer =
|
||||
allocateResponse.getCompletedContainersStatuses().size();
|
||||
|
@ -228,7 +248,13 @@ public class TestAMRMClientOnRMRestart {
|
|||
|
||||
// new NM to represent NM re-register
|
||||
nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
NMContainerStatus containerReport =
|
||||
NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
|
||||
Resource.newInstance(1024, 1), "recover container", 0,
|
||||
Priority.newInstance(0), 0);
|
||||
nm1.registerNode(Collections.singletonList(containerReport),
|
||||
Collections.singletonList(
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
|
@ -243,9 +269,9 @@ public class TestAMRMClientOnRMRestart {
|
|||
it.remove();
|
||||
}
|
||||
|
||||
ContainerRequest cRequest4 =
|
||||
ContainerRequest cRequest5 =
|
||||
createReq(1, 1024, new String[] { "h1", "h2" });
|
||||
amClient.addContainerRequest(cRequest4);
|
||||
amClient.addContainerRequest(cRequest5);
|
||||
|
||||
// Step-4 : On RM restart, AM(does not know RM is restarted) sends
|
||||
// additional
|
||||
|
@ -259,11 +285,13 @@ public class TestAMRMClientOnRMRestart {
|
|||
pendingRelease -= completedContainer;
|
||||
|
||||
assertAsksAndReleases(4, pendingRelease, rm2);
|
||||
// Verify there is one increase and zero decrease
|
||||
assertChanges(1, 0, rm2);
|
||||
assertBlacklistAdditionsAndRemovals(2, 0, rm2);
|
||||
|
||||
ContainerRequest cRequest5 =
|
||||
ContainerRequest cRequest6 =
|
||||
createReq(1, 1024, new String[] { "h1", "h2", "h3" });
|
||||
amClient.addContainerRequest(cRequest5);
|
||||
amClient.addContainerRequest(cRequest6);
|
||||
|
||||
// Step-5 : Allocater after resync command
|
||||
allocateResponse = amClient.allocate(0.5f);
|
||||
|
@ -272,6 +300,8 @@ public class TestAMRMClientOnRMRestart {
|
|||
.getAllocatedContainers().size());
|
||||
|
||||
assertAsksAndReleases(5, 0, rm2);
|
||||
// Verify there is no increase or decrease requests any more
|
||||
assertChanges(0, 0, rm2);
|
||||
assertBlacklistAdditionsAndRemovals(0, 0, rm2);
|
||||
|
||||
int noAssignedContainer = 0;
|
||||
|
@ -289,7 +319,7 @@ public class TestAMRMClientOnRMRestart {
|
|||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
|
||||
// Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6
|
||||
Assert.assertEquals("Number of container should be 3", 3,
|
||||
noAssignedContainer);
|
||||
|
||||
|
@ -519,6 +549,8 @@ public class TestAMRMClientOnRMRestart {
|
|||
|
||||
List<ResourceRequest> lastAsk = null;
|
||||
List<ContainerId> lastRelease = null;
|
||||
List<ContainerResourceChangeRequest> lastIncrease = null;
|
||||
List<ContainerResourceChangeRequest> lastDecrease = null;
|
||||
List<String> lastBlacklistAdditions;
|
||||
List<String> lastBlacklistRemovals;
|
||||
|
||||
|
@ -541,6 +573,8 @@ public class TestAMRMClientOnRMRestart {
|
|||
}
|
||||
lastAsk = ask;
|
||||
lastRelease = release;
|
||||
lastIncrease = increaseRequests;
|
||||
lastDecrease = decreaseRequests;
|
||||
lastBlacklistAdditions = blacklistAdditions;
|
||||
lastBlacklistRemovals = blacklistRemovals;
|
||||
return super.allocate(applicationAttemptId, askCopy, release,
|
||||
|
@ -647,6 +681,14 @@ public class TestAMRMClientOnRMRestart {
|
|||
rm.getMyFifoScheduler().lastRelease.size());
|
||||
}
|
||||
|
||||
private static void assertChanges(
|
||||
int expectedIncrease, int expectedDecrease, MyResourceManager rm) {
|
||||
Assert.assertEquals(
|
||||
expectedIncrease, rm.getMyFifoScheduler().lastIncrease.size());
|
||||
Assert.assertEquals(
|
||||
expectedDecrease, rm.getMyFifoScheduler().lastDecrease.size());
|
||||
}
|
||||
|
||||
private ContainerRequest createReq(int priority, int memory, String[] hosts) {
|
||||
Resource capability = Resource.newInstance(memory, 1);
|
||||
Priority priorityOfContainer = Priority.newInstance(priority);
|
||||
|
|
Loading…
Reference in New Issue