diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index aaa222d6dc4..2f4311e4c7c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -247,6 +247,9 @@ Release 2.8.0 - UNRELEASED YARN-1510. Make NMClient support change container resources. (Meng Ding via wangda) + YARN-1509. Make AMRMClient support send increase container request and + get increased/decreased containers. (Meng Ding via wangda) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index c1b96435e97..f410c430377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -556,7 +556,8 @@ public class ApplicationMaster { appSubmitterUgi.addCredentials(credentials); - AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); + AMRMClientAsync.AbstractCallbackHandler allocListener = + new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); @@ -731,7 +732,7 @@ public class ApplicationMaster { } @VisibleForTesting - class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { + class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { @SuppressWarnings("unchecked") @Override public void onContainersCompleted(List completedContainers) { @@ -834,6 +835,9 @@ public class ApplicationMaster { } } + @Override + public void onContainersResourceChanged(List containers) {} + @Override public void onShutdownRequest() { done = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index bfe10d60fd0..e0bf2d312e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -31,6 +31,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -285,7 +287,7 @@ public abstract class AMRMClient extends * @param req Resource request */ public abstract void addContainerRequest(T req); - + /** * Remove previous container request. The previous container request may have * already been sent to the ResourceManager. So even after the remove request @@ -294,7 +296,26 @@ public abstract class AMRMClient extends * @param req Resource request */ public abstract void removeContainerRequest(T req); - + + /** + * Request container resource change before calling allocate. + * Any previous pending resource change request of the same container will be + * removed. + * + * Application that calls this method is expected to maintain the + * Containers that are returned from previous successful + * allocations or resource changes. By passing in the existing container and a + * target resource capability to this method, the application requests the + * ResourceManager to change the existing resource allocation to the target + * resource allocation. + * + * @param container The container returned from the last successful resource + * allocation or resource change + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceChange( + Container container, Resource capability); + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index f62e71b48f9..3c8f92333c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -56,11 +56,17 @@ import com.google.common.annotations.VisibleForTesting; * It should be used by implementing a CallbackHandler: *
  * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ * class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
  *   public void onContainersAllocated(List containers) {
  *     [run tasks on the containers]
  *   }
- *   
+ *
+ *   public void onContainersResourceChanged(List containers) {
+ *     [determine if resource allocation of containers have been increased in
+ *      the ResourceManager, and if so, inform the NodeManagers to increase the
+ *      resource monitor/enforcement on the containers]
+ *   }
+ *
  *   public void onContainersCompleted(List statuses) {
  *     [update progress, check whether app is done]
  *   }
@@ -100,23 +106,80 @@ extends AbstractService {
   protected final CallbackHandler handler;
   protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
 
+  /**
+   * 

Create a new instance of AMRMClientAsync.

+ * + * @param intervalMs heartbeat interval in milliseconds between AM and RM + * @param callbackHandler callback handler that processes responses from + * the ResourceManager + */ + public static AMRMClientAsync + createAMRMClientAsync( + int intervalMs, AbstractCallbackHandler callbackHandler) { + return new AMRMClientAsyncImpl(intervalMs, callbackHandler); + } + + /** + *

Create a new instance of AMRMClientAsync.

+ * + * @param client the AMRMClient instance + * @param intervalMs heartbeat interval in milliseconds between AM and RM + * @param callbackHandler callback handler that processes responses from + * the ResourceManager + */ + public static AMRMClientAsync + createAMRMClientAsync( + AMRMClient client, int intervalMs, + AbstractCallbackHandler callbackHandler) { + return new AMRMClientAsyncImpl(client, intervalMs, callbackHandler); + } + + protected AMRMClientAsync( + int intervalMs, AbstractCallbackHandler callbackHandler) { + this(new AMRMClientImpl(), intervalMs, callbackHandler); + } + + @Private + @VisibleForTesting + protected AMRMClientAsync(AMRMClient client, int intervalMs, + AbstractCallbackHandler callbackHandler) { + super(AMRMClientAsync.class.getName()); + this.client = client; + this.heartbeatIntervalMs.set(intervalMs); + this.handler = callbackHandler; + } + + /** + * + * @deprecated Use {@link #createAMRMClientAsync(int, + * AMRMClientAsync.AbstractCallbackHandler)} instead. + */ + @Deprecated public static AMRMClientAsync createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { return new AMRMClientAsyncImpl(intervalMs, callbackHandler); } - + + /** + * + * @deprecated Use {@link #createAMRMClientAsync(AMRMClient, + * int, AMRMClientAsync.AbstractCallbackHandler)} instead. + */ + @Deprecated public static AMRMClientAsync createAMRMClientAsync(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { return new AMRMClientAsyncImpl(client, intervalMs, callbackHandler); } - + + @Deprecated protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { this(new AMRMClientImpl(), intervalMs, callbackHandler); } @Private @VisibleForTesting + @Deprecated protected AMRMClientAsync(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { super(AMRMClientAsync.class.getName()); @@ -171,6 +234,25 @@ extends AbstractService { */ public abstract void removeContainerRequest(T req); + /** + * Request container resource change before calling allocate. + * Any previous pending resource change request of the same container will be + * removed. + * + * Application that calls this method is expected to maintain the + * Containers that are returned from previous successful + * allocations or resource changes. By passing in the existing container and a + * target resource capability to this method, the application requests the + * ResourceManager to change the existing resource allocation to the target + * resource allocation. + * + * @param container The container returned from the last successful resource + * allocation or resource change + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceChange( + Container container, Resource capability); + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. @@ -264,37 +346,95 @@ extends AbstractService { } while (true); } - public interface CallbackHandler { - + /** + *

+ * The callback abstract class. The callback functions need to be implemented + * by {@link AMRMClientAsync} users. The APIs are called when responses from + * the ResourceManager are available. + *

+ */ + public abstract static class AbstractCallbackHandler + implements CallbackHandler { + /** * Called when the ResourceManager responds to a heartbeat with completed * containers. If the response contains both completed containers and * allocated containers, this will be called before containersAllocated. */ - public void onContainersCompleted(List statuses); - + public abstract void onContainersCompleted(List statuses); + /** * Called when the ResourceManager responds to a heartbeat with allocated * containers. If the response containers both completed containers and * allocated containers, this will be called after containersCompleted. */ - public void onContainersAllocated(List containers); - + public abstract void onContainersAllocated(List containers); + + /** + * Called when the ResourceManager responds to a heartbeat with containers + * whose resource allocation has been changed. + */ + public abstract void onContainersResourceChanged( + List containers); + /** * Called when the ResourceManager wants the ApplicationMaster to shutdown * for being out of sync etc. The ApplicationMaster should not unregister * with the RM unless the ApplicationMaster wants to be the last attempt. */ - public void onShutdownRequest(); - + public abstract void onShutdownRequest(); + /** * Called when nodes tracked by the ResourceManager have changed in health, * availability etc. */ - public void onNodesUpdated(List updatedNodes); - - public float getProgress(); - + public abstract void onNodesUpdated(List updatedNodes); + + public abstract float getProgress(); + + /** + * Called when error comes from RM communications as well as from errors in + * the callback itself from the app. Calling + * stop() is the recommended action. + */ + public abstract void onError(Throwable e); + } + + /** + * @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead. + */ + @Deprecated + public interface CallbackHandler { + + /** + * Called when the ResourceManager responds to a heartbeat with completed + * containers. If the response contains both completed containers and + * allocated containers, this will be called before containersAllocated. + */ + void onContainersCompleted(List statuses); + + /** + * Called when the ResourceManager responds to a heartbeat with allocated + * containers. If the response containers both completed containers and + * allocated containers, this will be called after containersCompleted. + */ + void onContainersAllocated(List containers); + + /** + * Called when the ResourceManager wants the ApplicationMaster to shutdown + * for being out of sync etc. The ApplicationMaster should not unregister + * with the RM unless the ApplicationMaster wants to be the last attempt. + */ + void onShutdownRequest(); + + /** + * Called when nodes tracked by the ResourceManager have changed in health, + * availability etc. + */ + void onNodesUpdated(List updatedNodes); + + float getProgress(); + /** * Called when error comes from RM communications as well as from errors in * the callback itself from the app. Calling @@ -302,6 +442,6 @@ extends AbstractService { * * @param e */ - public void onError(Throwable e); + void onError(Throwable e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index addc3b6daec..286ca28c379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.async.impl; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -66,13 +67,41 @@ extends AMRMClientAsync { private volatile float progress; private volatile Throwable savedException; - + + /** + * + * @param intervalMs heartbeat interval in milliseconds between AM and RM + * @param callbackHandler callback handler that processes responses from + * the ResourceManager + */ + public AMRMClientAsyncImpl( + int intervalMs, AbstractCallbackHandler callbackHandler) { + this(new AMRMClientImpl(), intervalMs, callbackHandler); + } + + public AMRMClientAsyncImpl(AMRMClient client, int intervalMs, + AbstractCallbackHandler callbackHandler) { + super(client, intervalMs, callbackHandler); + heartbeatThread = new HeartbeatThread(); + handlerThread = new CallbackHandlerThread(); + responseQueue = new LinkedBlockingQueue<>(); + keepRunning = true; + savedException = null; + } + + /** + * + * @deprecated Use {@link #AMRMClientAsyncImpl(int, + * AMRMClientAsync.AbstractCallbackHandler)} instead. + */ + @Deprecated public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { this(new AMRMClientImpl(), intervalMs, callbackHandler); } - + @Private @VisibleForTesting + @Deprecated public AMRMClientAsyncImpl(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { super(client, intervalMs, callbackHandler); @@ -82,7 +111,7 @@ extends AMRMClientAsync { keepRunning = true; savedException = null; } - + @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); @@ -177,6 +206,12 @@ extends AMRMClientAsync { client.removeContainerRequest(req); } + @Override + public void requestContainerResourceChange( + Container container, Resource capability) { + client.requestContainerResourceChange(container, capability); + } + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. @@ -300,6 +335,18 @@ extends AMRMClientAsync { handler.onContainersCompleted(completed); } + if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) { + // RM side of the implementation guarantees that there are + // no duplications between increased and decreased containers + List changed = new ArrayList<>(); + changed.addAll(response.getIncreasedContainers()); + changed.addAll(response.getDecreasedContainers()); + if (!changed.isEmpty()) { + ((AMRMClientAsync.AbstractCallbackHandler) handler) + .onContainersResourceChanged(changed); + } + } + List allocated = response.getAllocatedContainers(); if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4cf9aa08906..d2e931e0f6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.AbstractMap.SimpleEntry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +50,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -110,8 +114,8 @@ public class AMRMClientImpl extends AMRMClient { containerRequests = new LinkedHashSet(); } } - - + + /** * Class compares Resource by memory then cpu in reverse order */ @@ -144,10 +148,7 @@ public class AMRMClientImpl extends AMRMClient { int cpu0 = arg0.getVirtualCores(); int cpu1 = arg1.getVirtualCores(); - if(mem0 <= mem1 && cpu0 <= cpu1) { - return true; - } - return false; + return (mem0 <= mem1 && cpu0 <= cpu1); } //Key -> Priority @@ -164,11 +165,22 @@ public class AMRMClientImpl extends AMRMClient { protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); protected final Set release = new TreeSet(); - // pendingRelease holds history or release requests.request is removed only if - // RM sends completedContainer. + // pendingRelease holds history of release requests. + // request is removed only if RM sends completedContainer. // How it different from release? --> release is for per allocate() request. protected Set pendingRelease = new TreeSet(); - + // change map holds container resource change requests between two allocate() + // calls, and are cleared after each successful allocate() call. + protected final Map> change = + new HashMap<>(); + // pendingChange map holds history of container resource change requests in + // case AM needs to reregister with the ResourceManager. + // Change requests are removed from this map if RM confirms the change + // through allocate response, or if RM confirms that the container has been + // completed. + protected final Map> + pendingChange = new HashMap<>(); + public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); } @@ -241,7 +253,8 @@ public class AMRMClientImpl extends AMRMClient { AllocateRequest allocateRequest = null; List blacklistToAdd = new ArrayList(); List blacklistToRemove = new ArrayList(); - + Map> oldChange = + new HashMap<>(); try { synchronized (this) { askList = new ArrayList(ask.size()); @@ -252,10 +265,30 @@ public class AMRMClientImpl extends AMRMClient { r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression())); } + List increaseList = new ArrayList<>(); + List decreaseList = new ArrayList<>(); + // Save the current change for recovery + oldChange.putAll(change); + for (Map.Entry> entry : + change.entrySet()) { + Container container = entry.getValue().getKey(); + Resource original = container.getResource(); + Resource target = entry.getValue().getValue(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + decreaseList.add(ContainerResourceChangeRequest.newInstance( + container.getId(), target)); + } else { + // This is an increase request + increaseList.add(ContainerResourceChangeRequest.newInstance( + container.getId(), target)); + } + } releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure ask.clear(); release.clear(); + change.clear(); blacklistToAdd.addAll(blacklistAdditions); blacklistToRemove.addAll(blacklistRemovals); @@ -266,8 +299,9 @@ public class AMRMClientImpl extends AMRMClient { allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest); - // clear blacklistAdditions and blacklistRemovals before + askList, releaseList, blacklistRequest, + increaseList, decreaseList); + // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); blacklistRemovals.clear(); @@ -289,6 +323,7 @@ public class AMRMClientImpl extends AMRMClient { } } } + change.putAll(this.pendingChange); } // re register with RM registerApplicationMaster(); @@ -312,6 +347,23 @@ public class AMRMClientImpl extends AMRMClient { removePendingReleaseRequests(allocateResponse .getCompletedContainersStatuses()); } + if (!pendingChange.isEmpty()) { + List completed = + allocateResponse.getCompletedContainersStatuses(); + List changed = new ArrayList<>(); + changed.addAll(allocateResponse.getIncreasedContainers()); + changed.addAll(allocateResponse.getDecreasedContainers()); + // remove all pending change requests that belong to the completed + // containers + for (ContainerStatus status : completed) { + ContainerId containerId = status.getContainerId(); + pendingChange.remove(containerId); + } + // remove all pending change requests that have been satisfied + if (!changed.isEmpty()) { + removePendingChangeRequests(changed); + } + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -333,7 +385,22 @@ public class AMRMClientImpl extends AMRMClient { ask.add(oldAsk); } } - + // change requests could have been added during the allocate call. + // Those are the newest requests which take precedence + // over requests cached in the oldChange map. + // + // Only insert entries from the cached oldChange map + // that do not exist in the current change map: + for (Map.Entry> entry : + oldChange.entrySet()) { + ContainerId oldContainerId = entry.getKey(); + Container oldContainer = entry.getValue().getKey(); + Resource oldResource = entry.getValue().getValue(); + if (change.get(oldContainerId) == null) { + change.put( + oldContainerId, new SimpleEntry<>(oldContainer, oldResource)); + } + } blacklistAdditions.addAll(blacklistToAdd); blacklistRemovals.addAll(blacklistToRemove); } @@ -349,6 +416,24 @@ public class AMRMClientImpl extends AMRMClient { } } + protected void removePendingChangeRequests( + List changedContainers) { + for (Container changedContainer : changedContainers) { + ContainerId containerId = changedContainer.getId(); + if (pendingChange.get(containerId) == null) { + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("RM has confirmed changed resource allocation for " + + "container " + containerId + ". Current resource allocation:" + + changedContainer.getResource() + + ". Remove pending change request:" + + pendingChange.get(containerId).getValue()); + } + pendingChange.remove(containerId); + } + } + @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { @@ -479,12 +564,32 @@ public class AMRMClientImpl extends AMRMClient { req.getCapability(), req); } + @Override + public synchronized void requestContainerResourceChange( + Container container, Resource capability) { + validateContainerResourceChangeRequest( + container.getId(), container.getResource(), capability); + if (change.get(container.getId()) == null) { + change.put(container.getId(), + new SimpleEntry<>(container, capability)); + } else { + change.get(container.getId()).setValue(capability); + } + if (pendingChange.get(container.getId()) == null) { + pendingChange.put(container.getId(), + new SimpleEntry<>(container, capability)); + } else { + pendingChange.get(container.getId()).setValue(capability); + } + } + @Override public synchronized void releaseAssignedContainer(ContainerId containerId) { Preconditions.checkArgument(containerId != null, "ContainerId can not be null."); pendingRelease.add(containerId); release.add(containerId); + pendingChange.remove(containerId); } @Override @@ -618,7 +723,23 @@ public class AMRMClientImpl extends AMRMClient { "Cannot specify node label with rack and node"); } } - + + private void validateContainerResourceChangeRequest( + ContainerId containerId, Resource original, Resource target) { + Preconditions.checkArgument(containerId != null, + "ContainerId cannot be null"); + Preconditions.checkArgument(original != null, + "Original resource capability cannot be null"); + Preconditions.checkArgument(!Resources.equals(Resources.none(), original) + && Resources.fitsIn(Resources.none(), original), + "Original resource capability must be greater than 0"); + Preconditions.checkArgument(target != null, + "Target resource capability cannot be null"); + Preconditions.checkArgument(!Resources.equals(Resources.none(), target) + && Resources.fitsIn(Resources.none(), target), + "Target resource capability must be greater than 0"); + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 74d4aa47cbc..c7b3a94a8e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -74,12 +74,15 @@ public class TestAMRMClientAsync { List completed1 = Arrays.asList( ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); - List allocated1 = Arrays.asList( + List containers = Arrays.asList( Container.newInstance(null, null, null, null, null, null)); final AllocateResponse response1 = createAllocateResponse( - new ArrayList(), allocated1, null); + new ArrayList(), containers, null); final AllocateResponse response2 = createAllocateResponse(completed1, new ArrayList(), null); + final AllocateResponse response3 = createAllocateResponse( + new ArrayList(), new ArrayList(), + containers, containers, null); final AllocateResponse emptyResponse = createAllocateResponse( new ArrayList(), new ArrayList(), null); @@ -91,15 +94,15 @@ public class TestAMRMClientAsync { public AllocateResponse answer(InvocationOnMock invocation) throws Throwable { secondHeartbeatSync.incrementAndGet(); - while(heartbeatBlock.get()) { - synchronized(heartbeatBlock) { + while (heartbeatBlock.get()) { + synchronized (heartbeatBlock) { heartbeatBlock.wait(); } } secondHeartbeatSync.incrementAndGet(); return response2; } - }).thenReturn(emptyResponse); + }).thenReturn(response3).thenReturn(emptyResponse); when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); when(client.getAvailableResources()).thenAnswer(new Answer() { @@ -146,16 +149,22 @@ public class TestAMRMClientAsync { Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); Thread.sleep(10); } - + // wait for the completed containers from the second heartbeat's response while (callbackHandler.takeCompletedContainers() == null) { Thread.sleep(10); } - + + // wait for the changed containers from the thrid heartbeat's response + while (callbackHandler.takeChangedContainers() == null) { + Thread.sleep(10); + } + asyncClient.stop(); Assert.assertEquals(null, callbackHandler.takeAllocatedContainers()); Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); + Assert.assertEquals(null, callbackHandler.takeChangedContainers()); } @Test(timeout=10000) @@ -397,6 +406,17 @@ public class TestAMRMClientAsync { return response; } + private AllocateResponse createAllocateResponse( + List completed, List allocated, + List increased, List decreased, + List nmTokens) { + AllocateResponse response = + AllocateResponse.newInstance(0, completed, allocated, + new ArrayList(), null, null, 1, null, nmTokens, + increased, decreased); + return response; + } + public static ContainerId newContainerId(int appId, int appAttemptId, long timestamp, int containerId) { ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId); @@ -405,9 +425,11 @@ public class TestAMRMClientAsync { return ContainerId.newContainerId(applicationAttemptId, containerId); } - private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { + private class TestCallbackHandler + extends AMRMClientAsync.AbstractCallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; + private final List changedContainers = new ArrayList<>(); Exception savedException = null; volatile boolean reboot = false; Object notifier = new Object(); @@ -425,7 +447,19 @@ public class TestAMRMClientAsync { } return ret; } - + + public List takeChangedContainers() { + List ret = null; + synchronized (changedContainers) { + if (!changedContainers.isEmpty()) { + ret = new ArrayList<>(changedContainers); + changedContainers.clear(); + changedContainers.notify(); + } + } + return ret; + } + public List takeAllocatedContainers() { List ret = allocatedContainers; if (ret == null) { @@ -453,6 +487,22 @@ public class TestAMRMClientAsync { } } + @Override + public void onContainersResourceChanged( + List changed) { + synchronized (changedContainers) { + changedContainers.clear(); + changedContainers.addAll(changed); + while (!changedContainers.isEmpty()) { + try { + changedContainers.wait(); + } catch (InterruptedException ex) { + LOG.error("Interrupted during wait", ex); + } + } + } + } + @Override public void onContainersAllocated(List containers) { allocatedContainers = containers; @@ -494,7 +544,8 @@ public class TestAMRMClientAsync { } } - private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler { + private class TestCallbackHandler2 + extends AMRMClientAsync.AbstractCallbackHandler { Object notifier = new Object(); @SuppressWarnings("rawtypes") AMRMClientAsync asynClient; @@ -512,6 +563,9 @@ public class TestAMRMClientAsync { @Override public void onContainersAllocated(List containers) {} + @Override + public void onContainersResourceChanged(List containers) {} + @Override public void onShutdownRequest() {} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 7d29d052927..3b3c0ed26da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -35,10 +35,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; @@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; +import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -126,6 +129,8 @@ public class TestAMRMClient { rolling_interval_sec); conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster.init(conf); @@ -730,8 +735,160 @@ public class TestAMRMClient { new ContainerRequest(Resource.newInstance(1024, 1), null, null, Priority.UNDEFINED, true, "x && y")); } - - private void testAllocation(final AMRMClientImpl amClient) + + @Test(timeout=60000) + public void testAMRMClientWithContainerResourceChange() + throws YarnException, IOException { + AMRMClient amClient = null; + try { + // start am rm client + amClient = AMRMClient.createAMRMClient(); + Assert.assertNotNull(amClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), amClient.getNMTokenCache()); + amClient.init(conf); + amClient.start(); + assertEquals(STATE.STARTED, amClient.getServiceState()); + // start am nm client + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + // am rm client register the application master with RM + amClient.registerApplicationMaster("Host", 10000, ""); + // allocate three containers and make sure they are in RUNNING state + List containers = + allocateAndStartContainers(amClient, nmClient, 3); + // perform container resource increase and decrease tests + doContainerResourceChange(amClient, containers); + // unregister and finish up the test + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + private List allocateAndStartContainers( + final AMRMClient amClient, final NMClient nmClient, + int num) throws YarnException, IOException { + // set up allocation requests + for (int i = 0; i < num; ++i) { + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + } + // send allocation requests + amClient.allocate(0.1f); + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(150); + // get allocations + AllocateResponse allocResponse = amClient.allocate(0.1f); + List containers = allocResponse.getAllocatedContainers(); + Assert.assertEquals(num, containers.size()); + // build container launch context + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + // start a process long enough for increase/decrease action to take effect + ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext( + Collections.emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), securityTokens, + new HashMap()); + // start the containers and make sure they are in RUNNING state + try { + for (int i = 0; i < num; i++) { + Container container = containers.get(i); + nmClient.startContainer(container, clc); + // NodeManager may still need some time to get the stable + // container status + while (true) { + ContainerStatus status = nmClient.getContainerStatus( + container.getId(), container.getNodeId()); + if (status.getState() == ContainerState.RUNNING) { + break; + } + sleep(100); + } + } + } catch (YarnException e) { + throw new AssertionError("Exception is not expected: " + e); + } + // sleep to let NM's heartbeat to RM to confirm container launch + sleep(200); + return containers; + } + + + private void doContainerResourceChange( + final AMRMClient amClient, List containers) + throws YarnException, IOException { + Assert.assertEquals(3, containers.size()); + // remember the container IDs + Container container1 = containers.get(0); + Container container2 = containers.get(1); + Container container3 = containers.get(2); + AMRMClientImpl amClientImpl = + (AMRMClientImpl) amClient; + Assert.assertEquals(0, amClientImpl.change.size()); + // verify newer request overwrites older request for the container1 + amClientImpl.requestContainerResourceChange( + container1, Resource.newInstance(2048, 1)); + amClientImpl.requestContainerResourceChange( + container1, Resource.newInstance(4096, 1)); + Assert.assertEquals(Resource.newInstance(4096, 1), + amClientImpl.change.get(container1.getId()).getValue()); + // verify new decrease request cancels old increase request for container1 + amClientImpl.requestContainerResourceChange( + container1, Resource.newInstance(512, 1)); + Assert.assertEquals(Resource.newInstance(512, 1), + amClientImpl.change.get(container1.getId()).getValue()); + // request resource increase for container2 + amClientImpl.requestContainerResourceChange( + container2, Resource.newInstance(2048, 1)); + Assert.assertEquals(Resource.newInstance(2048, 1), + amClientImpl.change.get(container2.getId()).getValue()); + // verify release request will cancel pending change requests for the same + // container + amClientImpl.requestContainerResourceChange( + container3, Resource.newInstance(2048, 1)); + Assert.assertEquals(3, amClientImpl.pendingChange.size()); + amClientImpl.releaseAssignedContainer(container3.getId()); + Assert.assertEquals(2, amClientImpl.pendingChange.size()); + // as of now: container1 asks to decrease to (512, 1) + // container2 asks to increase to (2048, 1) + // send allocation requests + AllocateResponse allocResponse = amClient.allocate(0.1f); + Assert.assertEquals(0, amClientImpl.change.size()); + // we should get decrease confirmation right away + List decreasedContainers = + allocResponse.getDecreasedContainers(); + List increasedContainers = + allocResponse.getIncreasedContainers(); + Assert.assertEquals(1, decreasedContainers.size()); + Assert.assertEquals(0, increasedContainers.size()); + // we should get increase allocation after the next NM's heartbeat to RM + sleep(150); + // get allocations + allocResponse = amClient.allocate(0.1f); + decreasedContainers = + allocResponse.getDecreasedContainers(); + increasedContainers = + allocResponse.getIncreasedContainers(); + Assert.assertEquals(1, increasedContainers.size()); + Assert.assertEquals(0, decreasedContainers.size()); + } + + private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 65c85d9362a..0460f1ef71b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -90,15 +91,17 @@ public class TestAMRMClientOnRMRestart { } // Test does major 6 steps verification. - // Step-1 : AMRMClient send allocate request for 2 container requests - // Step-2 : 2 containers are allocated by RM. - // Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to + // Step-1 : AMRMClient send allocate request for 3 container requests + // Step-2 : 3 containers are allocated by RM. + // Step-3 : AM Send 1 containerRequest(cRequest4) and 1 releaseRequests to // RM + // Step-3.5 : AM Send 1 container resource increase request to RM // Step-4 : On RM restart, AM(does not know RM is restarted) sends additional - // containerRequest(cRequest4) and blacklisted nodes. + // containerRequest(cRequest5) and blacklisted nodes. // Intern RM send resync command - // Step-5 : Allocater after resync command & new containerRequest(cRequest5) - // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5 + // Verify AM can recover increase request after resync + // Step-5 : Allocater after resync command & new containerRequest(cRequest6) + // Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6 @Test(timeout = 60000) public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { @@ -132,8 +135,8 @@ public class TestAMRMClientOnRMRestart { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); ugi.addTokenIdentifier(token.decodeIdentifier()); - // Step-1 : AMRMClient send allocate request for 2 ContainerRequest - // cRequest1 = h1 and cRequest2 = h1,h2 + // Step-1 : AMRMClient send allocate request for 3 ContainerRequest + // cRequest1 = h1, cRequest2 = h1,h2 and cRequest3 = h1 // blacklisted nodes = h2 AMRMClient amClient = new MyAMRMClientImpl(rm1); amClient.init(conf); @@ -148,6 +151,9 @@ public class TestAMRMClientOnRMRestart { createReq(1, 1024, new String[] { "h1", "h2" }); amClient.addContainerRequest(cRequest2); + ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" }); + amClient.addContainerRequest(cRequest3); + List blacklistAdditions = new ArrayList(); List blacklistRemoval = new ArrayList(); blacklistAdditions.add("h2"); @@ -167,14 +173,14 @@ public class TestAMRMClientOnRMRestart { assertBlacklistAdditionsAndRemovals(1, 1, rm1); // Step-2 : NM heart beat is sent. - // On 2nd AM allocate request, RM allocates 2 containers to AM + // On 2nd AM allocate request, RM allocates 3 containers to AM nm1.nodeHeartbeat(true); // Node heartbeat dispatcher.await(); allocateResponse = amClient.allocate(0.2f); dispatcher.await(); - // 2 containers are allocated i.e for cRequest1 and cRequest2. - Assert.assertEquals("No of assignments must be 0", 2, allocateResponse + // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3. + Assert.assertEquals("No of assignments must be 0", 3, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(0, 0, rm1); assertBlacklistAdditionsAndRemovals(0, 0, rm1); @@ -184,6 +190,7 @@ public class TestAMRMClientOnRMRestart { // removed allocated container requests amClient.removeContainerRequest(cRequest1); amClient.removeContainerRequest(cRequest2); + amClient.removeContainerRequest(cRequest3); allocateResponse = amClient.allocate(0.2f); dispatcher.await(); @@ -193,8 +200,8 @@ public class TestAMRMClientOnRMRestart { assertBlacklistAdditionsAndRemovals(0, 0, rm1); // Step-3 : Send 1 containerRequest and 1 releaseRequests to RM - ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" }); - amClient.addContainerRequest(cRequest3); + ContainerRequest cRequest4 = createReq(1, 1024, new String[] { "h1" }); + amClient.addContainerRequest(cRequest4); int pendingRelease = 0; Iterator it = allocatedContainers.iterator(); @@ -205,11 +212,24 @@ public class TestAMRMClientOnRMRestart { break;// remove one container } + // Step-3.5 : Send 1 container resource increase request to RM + Container container = it.next(); + ContainerId containerId = container.getId(); + // Make sure that container is in RUNNING state before sending increase + // request + nm1.nodeHeartbeat(containerId.getApplicationAttemptId(), + containerId.getContainerId(), ContainerState.RUNNING); + amClient.requestContainerResourceChange( + container, Resource.newInstance(2048, 1)); + it.remove(); + allocateResponse = amClient.allocate(0.3f); dispatcher.await(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(3, pendingRelease, rm1); + // Verify there is one increase and zero decrease + assertChanges(1, 0, rm1); assertBlacklistAdditionsAndRemovals(0, 0, rm1); int completedContainer = allocateResponse.getCompletedContainersStatuses().size(); @@ -228,7 +248,13 @@ public class TestAMRMClientOnRMRestart { // new NM to represent NM re-register nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); - nm1.registerNode(); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + Resource.newInstance(1024, 1), "recover container", 0, + Priority.newInstance(0), 0); + nm1.registerNode(Collections.singletonList(containerReport), + Collections.singletonList( + containerId.getApplicationAttemptId().getApplicationId())); nm1.nodeHeartbeat(true); dispatcher.await(); @@ -243,9 +269,9 @@ public class TestAMRMClientOnRMRestart { it.remove(); } - ContainerRequest cRequest4 = + ContainerRequest cRequest5 = createReq(1, 1024, new String[] { "h1", "h2" }); - amClient.addContainerRequest(cRequest4); + amClient.addContainerRequest(cRequest5); // Step-4 : On RM restart, AM(does not know RM is restarted) sends // additional @@ -259,11 +285,13 @@ public class TestAMRMClientOnRMRestart { pendingRelease -= completedContainer; assertAsksAndReleases(4, pendingRelease, rm2); + // Verify there is one increase and zero decrease + assertChanges(1, 0, rm2); assertBlacklistAdditionsAndRemovals(2, 0, rm2); - ContainerRequest cRequest5 = + ContainerRequest cRequest6 = createReq(1, 1024, new String[] { "h1", "h2", "h3" }); - amClient.addContainerRequest(cRequest5); + amClient.addContainerRequest(cRequest6); // Step-5 : Allocater after resync command allocateResponse = amClient.allocate(0.5f); @@ -272,6 +300,8 @@ public class TestAMRMClientOnRMRestart { .getAllocatedContainers().size()); assertAsksAndReleases(5, 0, rm2); + // Verify there is no increase or decrease requests any more + assertChanges(0, 0, rm2); assertBlacklistAdditionsAndRemovals(0, 0, rm2); int noAssignedContainer = 0; @@ -289,7 +319,7 @@ public class TestAMRMClientOnRMRestart { Thread.sleep(1000); } - // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5 + // Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6 Assert.assertEquals("Number of container should be 3", 3, noAssignedContainer); @@ -519,6 +549,8 @@ public class TestAMRMClientOnRMRestart { List lastAsk = null; List lastRelease = null; + List lastIncrease = null; + List lastDecrease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; @@ -541,6 +573,8 @@ public class TestAMRMClientOnRMRestart { } lastAsk = ask; lastRelease = release; + lastIncrease = increaseRequests; + lastDecrease = decreaseRequests; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate(applicationAttemptId, askCopy, release, @@ -647,6 +681,14 @@ public class TestAMRMClientOnRMRestart { rm.getMyFifoScheduler().lastRelease.size()); } + private static void assertChanges( + int expectedIncrease, int expectedDecrease, MyResourceManager rm) { + Assert.assertEquals( + expectedIncrease, rm.getMyFifoScheduler().lastIncrease.size()); + Assert.assertEquals( + expectedDecrease, rm.getMyFifoScheduler().lastDecrease.size()); + } + private ContainerRequest createReq(int priority, int memory, String[] hosts) { Resource capability = Resource.newInstance(memory, 1); Priority priorityOfContainer = Priority.newInstance(priority);