YARN-660. Improve AMRMClient with matching requests (bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a77030abdc
commit
3492f5eff1
|
@ -239,6 +239,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-638. Modified ResourceManager to restore RMDelegationTokens after
|
YARN-638. Modified ResourceManager to restore RMDelegationTokens after
|
||||||
restarting. (Jian He via vinodkv)
|
restarting. (Jian He via vinodkv)
|
||||||
|
|
||||||
|
YARN-660. Improve AMRMClient with matching requests (bikas)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-512. Log aggregation root directory check is more expensive than it
|
YARN-512. Log aggregation root directory check is more expensive than it
|
||||||
|
|
|
@ -151,7 +151,7 @@ public class ApplicationMaster {
|
||||||
private YarnRPC rpc;
|
private YarnRPC rpc;
|
||||||
|
|
||||||
// Handle to communicate with the Resource Manager
|
// Handle to communicate with the Resource Manager
|
||||||
private AMRMClientAsync resourceManager;
|
private AMRMClientAsync<ContainerRequest> resourceManager;
|
||||||
|
|
||||||
// Application Attempt Id ( combination of attemptId and fail count )
|
// Application Attempt Id ( combination of attemptId and fail count )
|
||||||
private ApplicationAttemptId appAttemptID;
|
private ApplicationAttemptId appAttemptID;
|
||||||
|
@ -442,7 +442,9 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
||||||
|
|
||||||
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
|
resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID,
|
||||||
|
1000,
|
||||||
|
allocListener);
|
||||||
resourceManager.init(conf);
|
resourceManager.init(conf);
|
||||||
resourceManager.start();
|
resourceManager.start();
|
||||||
|
|
||||||
|
@ -522,7 +524,8 @@ public class ApplicationMaster {
|
||||||
FinalApplicationStatus appStatus;
|
FinalApplicationStatus appStatus;
|
||||||
String appMessage = null;
|
String appMessage = null;
|
||||||
success = true;
|
success = true;
|
||||||
if (numFailedContainers.get() == 0) {
|
if (numFailedContainers.get() == 0 &&
|
||||||
|
numCompletedContainers.get() == numTotalContainers) {
|
||||||
appStatus = FinalApplicationStatus.SUCCEEDED;
|
appStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
} else {
|
} else {
|
||||||
appStatus = FinalApplicationStatus.FAILED;
|
appStatus = FinalApplicationStatus.FAILED;
|
||||||
|
@ -594,11 +597,6 @@ public class ApplicationMaster {
|
||||||
resourceManager.addContainerRequest(containerAsk);
|
resourceManager.addContainerRequest(containerAsk);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set progress to deliver to RM on next heartbeat
|
|
||||||
float progress = (float) numCompletedContainers.get()
|
|
||||||
/ numTotalContainers;
|
|
||||||
resourceManager.setProgress(progress);
|
|
||||||
|
|
||||||
if (numCompletedContainers.get() == numTotalContainers) {
|
if (numCompletedContainers.get() == numTotalContainers) {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
|
@ -637,6 +635,19 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
// set progress to deliver to RM on next heartbeat
|
||||||
|
float progress = (float) numCompletedContainers.get()
|
||||||
|
/ numTotalContainers;
|
||||||
|
return progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Exception e) {
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,8 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client;
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -32,9 +33,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.service.Service;
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public interface AMRMClient extends Service {
|
public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Object to represent container request for resources.
|
* Object to represent container request for resources.
|
||||||
|
@ -43,20 +46,41 @@ public interface AMRMClient extends Service {
|
||||||
* Can ask for multiple containers of a given type.
|
* Can ask for multiple containers of a given type.
|
||||||
*/
|
*/
|
||||||
public static class ContainerRequest {
|
public static class ContainerRequest {
|
||||||
Resource capability;
|
final Resource capability;
|
||||||
String[] hosts;
|
final ImmutableList<String> hosts;
|
||||||
String[] racks;
|
final ImmutableList<String> racks;
|
||||||
Priority priority;
|
final Priority priority;
|
||||||
int containerCount;
|
final int containerCount;
|
||||||
|
|
||||||
public ContainerRequest(Resource capability, String[] hosts,
|
public ContainerRequest(Resource capability, String[] hosts,
|
||||||
String[] racks, Priority priority, int containerCount) {
|
String[] racks, Priority priority, int containerCount) {
|
||||||
this.capability = capability;
|
this.capability = capability;
|
||||||
this.hosts = (hosts != null ? hosts.clone() : null);
|
this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
|
||||||
this.racks = (racks != null ? racks.clone() : null);
|
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
this.containerCount = containerCount;
|
this.containerCount = containerCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getCapability() {
|
||||||
|
return capability;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<String> getHosts() {
|
||||||
|
return hosts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<String> getRacks() {
|
||||||
|
return racks;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Priority getPriority() {
|
||||||
|
return priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getContainerCount() {
|
||||||
|
return containerCount;
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("Capability[").append(capability).append("]");
|
sb.append("Capability[").append(capability).append("]");
|
||||||
|
@ -65,6 +89,22 @@ public interface AMRMClient extends Service {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This creates a <code>ContainerRequest</code> for 1 container and the
|
||||||
|
* AMRMClient stores this request internally. <code>getMatchingRequests</code>
|
||||||
|
* can be used to retrieve these requests from AMRMClient. These requests may
|
||||||
|
* be matched with an allocated container to determine which request to assign
|
||||||
|
* the container to. <code>removeContainerRequest</code> must be called using
|
||||||
|
* the same assigned <code>StoredContainerRequest</code> object so that
|
||||||
|
* AMRMClient can remove it from its internal store.
|
||||||
|
*/
|
||||||
|
public static class StoredContainerRequest extends ContainerRequest {
|
||||||
|
public StoredContainerRequest(Resource capability, String[] hosts,
|
||||||
|
String[] racks, Priority priority) {
|
||||||
|
super(capability, hosts, racks, priority, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the application master. This must be called before any
|
* Register the application master. This must be called before any
|
||||||
|
@ -117,7 +157,7 @@ public interface AMRMClient extends Service {
|
||||||
* Request containers for resources before calling <code>allocate</code>
|
* Request containers for resources before calling <code>allocate</code>
|
||||||
* @param req Resource request
|
* @param req Resource request
|
||||||
*/
|
*/
|
||||||
public void addContainerRequest(ContainerRequest req);
|
public void addContainerRequest(T req);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove previous container request. The previous container request may have
|
* Remove previous container request. The previous container request may have
|
||||||
|
@ -126,7 +166,7 @@ public interface AMRMClient extends Service {
|
||||||
* even after the remove request
|
* even after the remove request
|
||||||
* @param req Resource request
|
* @param req Resource request
|
||||||
*/
|
*/
|
||||||
public void removeContainerRequest(ContainerRequest req);
|
public void removeContainerRequest(T req);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||||
|
@ -150,4 +190,21 @@ public interface AMRMClient extends Service {
|
||||||
* @return Current number of nodes in the cluster
|
* @return Current number of nodes in the cluster
|
||||||
*/
|
*/
|
||||||
public int getClusterNodeCount();
|
public int getClusterNodeCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get outstanding <code>StoredContainerRequest</code>s matching the given
|
||||||
|
* parameters. These StoredContainerRequests should have been added via
|
||||||
|
* <code>addContainerRequest</code> earlier in the lifecycle. For performance,
|
||||||
|
* the AMRMClient may return its internal collection directly without creating
|
||||||
|
* a copy. Users should not perform mutable operations on the return value.
|
||||||
|
* Each collection in the list contains requests with identical
|
||||||
|
* <code>Resource</code> size that fit in the given capability. In a
|
||||||
|
* collection, requests will be returned in the same order as they were added.
|
||||||
|
* @return Collection of request matching the parameters
|
||||||
|
*/
|
||||||
|
public List<? extends Collection<T>> getMatchingRequests(
|
||||||
|
Priority priority,
|
||||||
|
String resourceName,
|
||||||
|
Resource capability);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,11 @@
|
||||||
package org.apache.hadoop.yarn.client;
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
|
@ -88,55 +92,50 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@Unstable
|
@Unstable
|
||||||
@Evolving
|
@Evolving
|
||||||
public class AMRMClientAsync extends AbstractService {
|
public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
|
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
|
||||||
|
|
||||||
private final AMRMClient client;
|
private final AMRMClient<T> client;
|
||||||
private final int intervalMs;
|
private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
|
||||||
private final HeartbeatThread heartbeatThread;
|
private final HeartbeatThread heartbeatThread;
|
||||||
private final CallbackHandlerThread handlerThread;
|
private final CallbackHandlerThread handlerThread;
|
||||||
private final CallbackHandler handler;
|
private final CallbackHandler handler;
|
||||||
|
|
||||||
private final BlockingQueue<AllocateResponse> responseQueue;
|
private final BlockingQueue<AllocateResponse> responseQueue;
|
||||||
|
|
||||||
|
private final Object unregisterHeartbeatLock = new Object();
|
||||||
|
|
||||||
private volatile boolean keepRunning;
|
private volatile boolean keepRunning;
|
||||||
private volatile float progress;
|
private volatile float progress;
|
||||||
|
|
||||||
|
private volatile Exception savedException;
|
||||||
|
|
||||||
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
|
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
|
||||||
CallbackHandler callbackHandler) {
|
CallbackHandler callbackHandler) {
|
||||||
this(new AMRMClientImpl(id), intervalMs, callbackHandler);
|
this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
AMRMClientAsync(AMRMClient client, int intervalMs,
|
public AMRMClientAsync(AMRMClient<T> client, int intervalMs,
|
||||||
CallbackHandler callbackHandler) {
|
CallbackHandler callbackHandler) {
|
||||||
super(AMRMClientAsync.class.getName());
|
super(AMRMClientAsync.class.getName());
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.intervalMs = intervalMs;
|
this.heartbeatIntervalMs.set(intervalMs);
|
||||||
handler = callbackHandler;
|
handler = callbackHandler;
|
||||||
heartbeatThread = new HeartbeatThread();
|
heartbeatThread = new HeartbeatThread();
|
||||||
handlerThread = new CallbackHandlerThread();
|
handlerThread = new CallbackHandlerThread();
|
||||||
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
|
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
|
||||||
keepRunning = true;
|
keepRunning = true;
|
||||||
|
savedException = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the application's current progress. It will be transmitted to the
|
|
||||||
* resource manager on the next heartbeat.
|
|
||||||
* @param progress
|
|
||||||
* the application's progress so far
|
|
||||||
*/
|
|
||||||
public void setProgress(float progress) {
|
|
||||||
this.progress = progress;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
client.init(conf);
|
client.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
@ -171,6 +170,17 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setHeartbeatInterval(int interval) {
|
||||||
|
heartbeatIntervalMs.set(interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<? extends Collection<T>> getMatchingRequests(
|
||||||
|
Priority priority,
|
||||||
|
String resourceName,
|
||||||
|
Resource capability) {
|
||||||
|
return client.getMatchingRequests(priority, resourceName, capability);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers this application master with the resource manager. On successful
|
* Registers this application master with the resource manager. On successful
|
||||||
* registration, starts the heartbeating thread.
|
* registration, starts the heartbeating thread.
|
||||||
|
@ -180,8 +190,8 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
String appHostName, int appHostPort, String appTrackingUrl)
|
String appHostName, int appHostPort, String appTrackingUrl)
|
||||||
throws YarnRemoteException, IOException {
|
throws YarnRemoteException, IOException {
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response = client
|
||||||
client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
|
.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
|
||||||
heartbeatThread.start();
|
heartbeatThread.start();
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -195,8 +205,9 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
||||||
String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
|
String appMessage, String appTrackingUrl) throws YarnRemoteException,
|
||||||
synchronized (client) {
|
IOException {
|
||||||
|
synchronized (unregisterHeartbeatLock) {
|
||||||
keepRunning = false;
|
keepRunning = false;
|
||||||
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
|
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
|
||||||
}
|
}
|
||||||
|
@ -206,7 +217,7 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
* Request containers for resources before calling <code>allocate</code>
|
* Request containers for resources before calling <code>allocate</code>
|
||||||
* @param req Resource request
|
* @param req Resource request
|
||||||
*/
|
*/
|
||||||
public void addContainerRequest(AMRMClient.ContainerRequest req) {
|
public void addContainerRequest(T req) {
|
||||||
client.addContainerRequest(req);
|
client.addContainerRequest(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,7 +228,7 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
* even after the remove request
|
* even after the remove request
|
||||||
* @param req Resource request
|
* @param req Resource request
|
||||||
*/
|
*/
|
||||||
public void removeContainerRequest(AMRMClient.ContainerRequest req) {
|
public void removeContainerRequest(T req) {
|
||||||
client.removeContainerRequest(req);
|
client.removeContainerRequest(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +270,7 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
while (true) {
|
while (true) {
|
||||||
AllocateResponse response = null;
|
AllocateResponse response = null;
|
||||||
// synchronization ensures we don't send heartbeats after unregistering
|
// synchronization ensures we don't send heartbeats after unregistering
|
||||||
synchronized (client) {
|
synchronized (unregisterHeartbeatLock) {
|
||||||
if (!keepRunning) {
|
if (!keepRunning) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -267,9 +278,17 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
try {
|
try {
|
||||||
response = client.allocate(progress);
|
response = client.allocate(progress);
|
||||||
} catch (YarnRemoteException ex) {
|
} catch (YarnRemoteException ex) {
|
||||||
LOG.error("Failed to heartbeat", ex);
|
LOG.error("Yarn exception on heartbeat", ex);
|
||||||
|
savedException = ex;
|
||||||
|
// interrupt handler thread in case it waiting on the queue
|
||||||
|
handlerThread.interrupt();
|
||||||
|
break;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to heartbeat", e);
|
LOG.error("IO exception on heartbeat", e);
|
||||||
|
savedException = e;
|
||||||
|
// interrupt handler thread in case it waiting on the queue
|
||||||
|
handlerThread.interrupt();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (response != null) {
|
if (response != null) {
|
||||||
|
@ -278,15 +297,15 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
responseQueue.put(response);
|
responseQueue.put(response);
|
||||||
break;
|
break;
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
LOG.warn("Interrupted while waiting to put on response queue", ex);
|
LOG.info("Interrupted while waiting to put on response queue", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(intervalMs);
|
Thread.sleep(heartbeatIntervalMs.get());
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
LOG.warn("Heartbeater interrupted", ex);
|
LOG.info("Heartbeater interrupted", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,14 +320,21 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
while (keepRunning) {
|
while (keepRunning) {
|
||||||
AllocateResponse response;
|
AllocateResponse response;
|
||||||
try {
|
try {
|
||||||
|
if(savedException != null) {
|
||||||
|
LOG.error("Stopping callback due to: ", savedException);
|
||||||
|
handler.onError(savedException);
|
||||||
|
break;
|
||||||
|
}
|
||||||
response = responseQueue.take();
|
response = responseQueue.take();
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
LOG.info("Interrupted while waiting for queue");
|
LOG.info("Interrupted while waiting for queue", ex);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (response.getReboot()) {
|
if (response.getReboot()) {
|
||||||
handler.onRebootRequest();
|
handler.onRebootRequest();
|
||||||
|
LOG.info("Reboot requested. Stopping callback.");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
||||||
if (!updatedNodes.isEmpty()) {
|
if (!updatedNodes.isEmpty()) {
|
||||||
|
@ -325,6 +351,8 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
if (!allocated.isEmpty()) {
|
if (!allocated.isEmpty()) {
|
||||||
handler.onContainersAllocated(allocated);
|
handler.onContainersAllocated(allocated);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
progress = handler.getProgress();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -347,14 +375,19 @@ public class AMRMClientAsync extends AbstractService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the ResourceManager wants the ApplicationMaster to reboot
|
* Called when the ResourceManager wants the ApplicationMaster to reboot
|
||||||
* for being out of sync.
|
* for being out of sync. The ApplicationMaster should not unregister with
|
||||||
|
* the RM unless the ApplicationMaster wants to be the last attempt.
|
||||||
*/
|
*/
|
||||||
public void onRebootRequest();
|
public void onRebootRequest();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when nodes tracked by the ResourceManager have changed in in health,
|
* Called when nodes tracked by the ResourceManager have changed in health,
|
||||||
* availability etc.
|
* availability etc.
|
||||||
*/
|
*/
|
||||||
public void onNodesUpdated(List<NodeReport> updatedNodes);
|
public void onNodesUpdated(List<NodeReport> updatedNodes);
|
||||||
|
|
||||||
|
public float getProgress();
|
||||||
|
|
||||||
|
public void onError(Exception e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,15 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -47,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -55,8 +62,11 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
|
// TODO check inputs for null etc. YARN-654
|
||||||
|
|
||||||
@Unstable
|
@Unstable
|
||||||
public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
public class AMRMClientImpl<T extends ContainerRequest>
|
||||||
|
extends AbstractService implements AMRMClient<T> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
|
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
|
||||||
|
|
||||||
|
@ -70,6 +80,57 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
protected Resource clusterAvailableResources;
|
protected Resource clusterAvailableResources;
|
||||||
protected int clusterNodeCount;
|
protected int clusterNodeCount;
|
||||||
|
|
||||||
|
class ResourceRequestInfo {
|
||||||
|
ResourceRequest remoteRequest;
|
||||||
|
LinkedHashSet<T> containerRequests;
|
||||||
|
|
||||||
|
ResourceRequestInfo(Priority priority, String resourceName,
|
||||||
|
Resource capability) {
|
||||||
|
remoteRequest = BuilderUtils.newResourceRequest(priority, resourceName,
|
||||||
|
capability, 0);
|
||||||
|
containerRequests = new LinkedHashSet<T>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class compares Resource by memory then cpu in reverse order
|
||||||
|
*/
|
||||||
|
class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
|
||||||
|
@Override
|
||||||
|
public int compare(Resource arg0, Resource arg1) {
|
||||||
|
int mem0 = arg0.getMemory();
|
||||||
|
int mem1 = arg1.getMemory();
|
||||||
|
int cpu0 = arg0.getVirtualCores();
|
||||||
|
int cpu1 = arg1.getVirtualCores();
|
||||||
|
if(mem0 == mem1) {
|
||||||
|
if(cpu0 == cpu1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if(cpu0 < cpu1) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if(mem0 < mem1) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean canFit(Resource arg0, Resource arg1) {
|
||||||
|
int mem0 = arg0.getMemory();
|
||||||
|
int mem1 = arg1.getMemory();
|
||||||
|
int cpu0 = arg0.getVirtualCores();
|
||||||
|
int cpu1 = arg1.getVirtualCores();
|
||||||
|
|
||||||
|
if(mem0 <= mem1 && cpu0 <= cpu1) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
//Key -> Priority
|
//Key -> Priority
|
||||||
//Value -> Map
|
//Value -> Map
|
||||||
//Key->ResourceName (e.g., hostname, rackname, *)
|
//Key->ResourceName (e.g., hostname, rackname, *)
|
||||||
|
@ -77,9 +138,9 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
//Key->Resource Capability
|
//Key->Resource Capability
|
||||||
//Value->ResourceRequest
|
//Value->ResourceRequest
|
||||||
protected final
|
protected final
|
||||||
Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
|
Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
|
||||||
remoteRequestsTable =
|
remoteRequestsTable =
|
||||||
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
|
new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
|
||||||
|
|
||||||
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
||||||
new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
|
new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
|
||||||
|
@ -223,42 +284,47 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void addContainerRequest(ContainerRequest req) {
|
public synchronized void addContainerRequest(T req) {
|
||||||
// Create resource requests
|
// Create resource requests
|
||||||
if(req.hosts != null) {
|
// add check for dup locations
|
||||||
|
if (req.hosts != null) {
|
||||||
for (String host : req.hosts) {
|
for (String host : req.hosts) {
|
||||||
addResourceRequest(req.priority, host, req.capability, req.containerCount);
|
addResourceRequest(req.priority, host, req.capability,
|
||||||
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(req.racks != null) {
|
if (req.racks != null) {
|
||||||
for (String rack : req.racks) {
|
for (String rack : req.racks) {
|
||||||
addResourceRequest(req.priority, rack, req.capability, req.containerCount);
|
addResourceRequest(req.priority, rack, req.capability,
|
||||||
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Off-switch
|
// Off-switch
|
||||||
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
|
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
|
||||||
req.containerCount);
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeContainerRequest(ContainerRequest req) {
|
public synchronized void removeContainerRequest(T req) {
|
||||||
// Update resource requests
|
// Update resource requests
|
||||||
if(req.hosts != null) {
|
if (req.hosts != null) {
|
||||||
for (String hostName : req.hosts) {
|
for (String hostName : req.hosts) {
|
||||||
decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
|
decResourceRequest(req.priority, hostName, req.capability,
|
||||||
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(req.racks != null) {
|
if (req.racks != null) {
|
||||||
for (String rack : req.racks) {
|
for (String rack : req.racks) {
|
||||||
decResourceRequest(req.priority, rack, req.capability, req.containerCount);
|
decResourceRequest(req.priority, rack, req.capability,
|
||||||
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
|
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
|
||||||
req.containerCount);
|
req.containerCount, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -276,6 +342,44 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
return clusterNodeCount;
|
return clusterNodeCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized List<? extends Collection<T>> getMatchingRequests(
|
||||||
|
Priority priority,
|
||||||
|
String resourceName,
|
||||||
|
Resource capability) {
|
||||||
|
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
|
||||||
|
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
||||||
|
this.remoteRequestsTable.get(priority);
|
||||||
|
if (remoteRequests == null) {
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
|
||||||
|
.get(resourceName);
|
||||||
|
if (reqMap == null) {
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
||||||
|
if (resourceRequestInfo != null) {
|
||||||
|
list.add(resourceRequestInfo.containerRequests);
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
// no exact match. Container may be larger than what was requested.
|
||||||
|
// get all resources <= capability. map is reverse sorted.
|
||||||
|
SortedMap<Resource, ResourceRequestInfo> tailMap =
|
||||||
|
reqMap.tailMap(capability);
|
||||||
|
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
|
||||||
|
if(canFit(entry.getKey(), capability)) {
|
||||||
|
// match found that fits in the larger resource
|
||||||
|
list.add(entry.getValue().containerRequests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// no match found
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||||
// This code looks weird but is needed because of the following scenario.
|
// This code looks weird but is needed because of the following scenario.
|
||||||
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
|
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
|
||||||
|
@ -294,44 +398,57 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addResourceRequest(Priority priority, String resourceName,
|
private void addResourceRequest(Priority priority, String resourceName,
|
||||||
Resource capability, int containerCount) {
|
Resource capability, int containerCount, T req) {
|
||||||
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
|
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
||||||
this.remoteRequestsTable.get(priority);
|
this.remoteRequestsTable.get(priority);
|
||||||
if (remoteRequests == null) {
|
if (remoteRequests == null) {
|
||||||
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
|
remoteRequests =
|
||||||
|
new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
|
||||||
this.remoteRequestsTable.put(priority, remoteRequests);
|
this.remoteRequestsTable.put(priority, remoteRequests);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Added priority=" + priority);
|
LOG.debug("Added priority=" + priority);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
|
TreeMap<Resource, ResourceRequestInfo> reqMap =
|
||||||
|
remoteRequests.get(resourceName);
|
||||||
if (reqMap == null) {
|
if (reqMap == null) {
|
||||||
reqMap = new HashMap<Resource, ResourceRequest>();
|
// capabilities are stored in reverse sorted order. smallest last.
|
||||||
|
reqMap = new TreeMap<Resource, ResourceRequestInfo>(
|
||||||
|
new ResourceReverseMemoryThenCpuComparator());
|
||||||
remoteRequests.put(resourceName, reqMap);
|
remoteRequests.put(resourceName, reqMap);
|
||||||
}
|
}
|
||||||
ResourceRequest remoteRequest = reqMap.get(capability);
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
||||||
if (remoteRequest == null) {
|
if (resourceRequestInfo == null) {
|
||||||
remoteRequest = BuilderUtils.
|
resourceRequestInfo =
|
||||||
newResourceRequest(priority, resourceName, capability, 0);
|
new ResourceRequestInfo(priority, resourceName, capability);
|
||||||
reqMap.put(capability, remoteRequest);
|
reqMap.put(capability, resourceRequestInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
|
resourceRequestInfo.remoteRequest.setNumContainers(
|
||||||
|
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
|
||||||
|
|
||||||
|
if(req instanceof StoredContainerRequest) {
|
||||||
|
resourceRequestInfo.containerRequests.add(req);
|
||||||
|
}
|
||||||
|
|
||||||
// Note this down for next interaction with ResourceManager
|
// Note this down for next interaction with ResourceManager
|
||||||
addResourceRequestToAsk(remoteRequest);
|
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("addResourceRequest:" + " applicationId="
|
LOG.debug("addResourceRequest:" + " applicationId="
|
||||||
+ appAttemptId + " priority=" + priority.getPriority()
|
+ appAttemptId + " priority=" + priority.getPriority()
|
||||||
+ " resourceName=" + resourceName + " numContainers="
|
+ " resourceName=" + resourceName + " numContainers="
|
||||||
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
+ resourceRequestInfo.remoteRequest.getNumContainers()
|
||||||
|
+ " #asks=" + ask.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decResourceRequest(Priority priority, String resourceName,
|
private void decResourceRequest(Priority priority,
|
||||||
Resource capability, int containerCount) {
|
String resourceName,
|
||||||
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
|
Resource capability,
|
||||||
|
int containerCount,
|
||||||
|
T req) {
|
||||||
|
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
||||||
this.remoteRequestsTable.get(priority);
|
this.remoteRequestsTable.get(priority);
|
||||||
|
|
||||||
if(remoteRequests == null) {
|
if(remoteRequests == null) {
|
||||||
|
@ -342,7 +459,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
|
Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
|
||||||
if (reqMap == null) {
|
if (reqMap == null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Not decrementing resource as " + resourceName
|
LOG.debug("Not decrementing resource as " + resourceName
|
||||||
|
@ -350,28 +467,34 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ResourceRequest remoteRequest = reqMap.get(capability);
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
|
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
|
||||||
+ appAttemptId + " priority=" + priority.getPriority()
|
+ appAttemptId + " priority=" + priority.getPriority()
|
||||||
+ " resourceName=" + resourceName + " numContainers="
|
+ " resourceName=" + resourceName + " numContainers="
|
||||||
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
+ resourceRequestInfo.remoteRequest.getNumContainers()
|
||||||
|
+ " #asks=" + ask.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteRequest.
|
resourceRequestInfo.remoteRequest.setNumContainers(
|
||||||
setNumContainers(remoteRequest.getNumContainers() - containerCount);
|
resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
|
||||||
if(remoteRequest.getNumContainers() < 0) {
|
|
||||||
|
if(req instanceof StoredContainerRequest) {
|
||||||
|
resourceRequestInfo.containerRequests.remove(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
|
||||||
// guard against spurious removals
|
// guard against spurious removals
|
||||||
remoteRequest.setNumContainers(0);
|
resourceRequestInfo.remoteRequest.setNumContainers(0);
|
||||||
}
|
}
|
||||||
// send the ResourceRequest to RM even if is 0 because it needs to override
|
// send the ResourceRequest to RM even if is 0 because it needs to override
|
||||||
// a previously sent value. If ResourceRequest was not sent previously then
|
// a previously sent value. If ResourceRequest was not sent previously then
|
||||||
// sending 0 aught to be a no-op on RM
|
// sending 0 aught to be a no-op on RM
|
||||||
addResourceRequestToAsk(remoteRequest);
|
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
|
||||||
|
|
||||||
// delete entries from map if no longer needed
|
// delete entries from map if no longer needed
|
||||||
if (remoteRequest.getNumContainers() == 0) {
|
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
|
||||||
reqMap.remove(capability);
|
reqMap.remove(capability);
|
||||||
if (reqMap.size() == 0) {
|
if (reqMap.size() == 0) {
|
||||||
remoteRequests.remove(resourceName);
|
remoteRequests.remove(resourceName);
|
||||||
|
@ -385,7 +508,8 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
|
||||||
LOG.info("AFTER decResourceRequest:" + " applicationId="
|
LOG.info("AFTER decResourceRequest:" + " applicationId="
|
||||||
+ appAttemptId + " priority=" + priority.getPriority()
|
+ appAttemptId + " priority=" + priority.getPriority()
|
||||||
+ " resourceName=" + resourceName + " numContainers="
|
+ " resourceName=" + resourceName + " numContainers="
|
||||||
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
+ resourceRequestInfo.remoteRequest.getNumContainers()
|
||||||
|
+ " #asks=" + ask.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
@ -50,27 +51,38 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
public class TestAMRMClient {
|
public class TestAMRMClient {
|
||||||
Configuration conf = null;
|
static Configuration conf = null;
|
||||||
MiniYARNCluster yarnCluster = null;
|
static MiniYARNCluster yarnCluster = null;
|
||||||
YarnClientImpl yarnClient = null;
|
static YarnClientImpl yarnClient = null;
|
||||||
List<NodeReport> nodeReports = null;
|
static List<NodeReport> nodeReports = null;
|
||||||
ApplicationAttemptId attemptId = null;
|
static ApplicationAttemptId attemptId = null;
|
||||||
int nodeCount = 3;
|
static int nodeCount = 3;
|
||||||
|
|
||||||
@Before
|
static Resource capability;
|
||||||
public void setup() throws YarnRemoteException, IOException {
|
static Priority priority;
|
||||||
|
static String node;
|
||||||
|
static String rack;
|
||||||
|
static String[] nodes;
|
||||||
|
static String[] racks;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
// start minicluster
|
// start minicluster
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||||
|
@ -84,7 +96,17 @@ public class TestAMRMClient {
|
||||||
|
|
||||||
// get node info
|
// get node info
|
||||||
nodeReports = yarnClient.getNodeReports();
|
nodeReports = yarnClient.getNodeReports();
|
||||||
|
|
||||||
|
priority = BuilderUtils.newPriority(1);
|
||||||
|
capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
node = nodeReports.get(0).getNodeId().getHost();
|
||||||
|
rack = nodeReports.get(0).getRackName();
|
||||||
|
nodes = new String[]{ node };
|
||||||
|
racks = new String[]{ rack };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startApp() throws Exception {
|
||||||
// submit new app
|
// submit new app
|
||||||
GetNewApplicationResponse newApp = yarnClient.getNewApplication();
|
GetNewApplicationResponse newApp = yarnClient.getNewApplication();
|
||||||
ApplicationId appId = newApp.getApplicationId();
|
ApplicationId appId = newApp.getApplicationId();
|
||||||
|
@ -125,7 +147,12 @@ public class TestAMRMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void cancelApp() {
|
||||||
|
attemptId = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
|
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
|
||||||
yarnClient.stop();
|
yarnClient.stop();
|
||||||
}
|
}
|
||||||
|
@ -133,13 +160,235 @@ public class TestAMRMClient {
|
||||||
yarnCluster.stop();
|
yarnCluster.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAMRMClientMatchingFit() throws YarnRemoteException, IOException {
|
||||||
|
AMRMClientImpl<StoredContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
|
||||||
|
Resource capability1 = BuilderUtils.newResource(1024, 2);
|
||||||
|
Resource capability2 = BuilderUtils.newResource(1024, 1);
|
||||||
|
Resource capability3 = BuilderUtils.newResource(1000, 2);
|
||||||
|
Resource capability4 = BuilderUtils.newResource(2000, 1);
|
||||||
|
Resource capability5 = BuilderUtils.newResource(1000, 3);
|
||||||
|
Resource capability6 = BuilderUtils.newResource(2000, 1);
|
||||||
|
|
||||||
|
StoredContainerRequest storedContainer1 =
|
||||||
|
new StoredContainerRequest(capability1, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer2 =
|
||||||
|
new StoredContainerRequest(capability2, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer3 =
|
||||||
|
new StoredContainerRequest(capability3, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer4 =
|
||||||
|
new StoredContainerRequest(capability4, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer5 =
|
||||||
|
new StoredContainerRequest(capability5, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer6 =
|
||||||
|
new StoredContainerRequest(capability6, nodes, racks, priority);
|
||||||
|
amClient.addContainerRequest(storedContainer1);
|
||||||
|
amClient.addContainerRequest(storedContainer2);
|
||||||
|
amClient.addContainerRequest(storedContainer3);
|
||||||
|
amClient.addContainerRequest(storedContainer4);
|
||||||
|
amClient.addContainerRequest(storedContainer5);
|
||||||
|
amClient.addContainerRequest(storedContainer6);
|
||||||
|
|
||||||
|
// test matching of containers
|
||||||
|
List<? extends Collection<StoredContainerRequest>> matches;
|
||||||
|
StoredContainerRequest storedRequest;
|
||||||
|
// exact match
|
||||||
|
Resource testCapability1 = BuilderUtils.newResource(1024, 2);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, testCapability1);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
storedRequest = matches.get(0).iterator().next();
|
||||||
|
assertTrue(storedContainer1 == storedRequest);
|
||||||
|
amClient.removeContainerRequest(storedContainer1);
|
||||||
|
|
||||||
|
// exact matching with order maintained
|
||||||
|
Resource testCapability2 = BuilderUtils.newResource(2000, 1);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, testCapability2);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
// must be returned in the order they were made
|
||||||
|
int i = 0;
|
||||||
|
for(StoredContainerRequest storedRequest1 : matches.get(0)) {
|
||||||
|
if(i++ == 0) {
|
||||||
|
assertTrue(storedContainer4 == storedRequest1);
|
||||||
|
} else {
|
||||||
|
assertTrue(storedContainer6 == storedRequest1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amClient.removeContainerRequest(storedContainer6);
|
||||||
|
|
||||||
|
// matching with larger container. all requests returned
|
||||||
|
Resource testCapability3 = BuilderUtils.newResource(4000, 4);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, testCapability3);
|
||||||
|
assert(matches.size() == 4);
|
||||||
|
|
||||||
|
Resource testCapability4 = BuilderUtils.newResource(1024, 2);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, testCapability4);
|
||||||
|
assert(matches.size() == 2);
|
||||||
|
// verify non-fitting containers are not returned and fitting ones are
|
||||||
|
for(Collection<StoredContainerRequest> testSet : matches) {
|
||||||
|
assertTrue(testSet.size() == 1);
|
||||||
|
StoredContainerRequest testRequest = testSet.iterator().next();
|
||||||
|
assertTrue(testRequest != storedContainer4);
|
||||||
|
assertTrue(testRequest != storedContainer5);
|
||||||
|
assert(testRequest == storedContainer2 ||
|
||||||
|
testRequest == storedContainer3);
|
||||||
|
}
|
||||||
|
|
||||||
|
Resource testCapability5 = BuilderUtils.newResource(512, 4);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, testCapability5);
|
||||||
|
assert(matches.size() == 0);
|
||||||
|
|
||||||
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||||
|
null, null);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyMatches(
|
||||||
|
List<? extends Collection<StoredContainerRequest>> matches,
|
||||||
|
int matchSize) {
|
||||||
|
assertTrue(matches.size() == 1);
|
||||||
|
assertTrue(matches.get(0).size() == matchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException {
|
||||||
|
AMRMClientImpl<StoredContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
|
||||||
|
Priority priority1 = Records.newRecord(Priority.class);
|
||||||
|
priority1.setPriority(2);
|
||||||
|
|
||||||
|
StoredContainerRequest storedContainer1 =
|
||||||
|
new StoredContainerRequest(capability, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer2 =
|
||||||
|
new StoredContainerRequest(capability, nodes, racks, priority);
|
||||||
|
StoredContainerRequest storedContainer3 =
|
||||||
|
new StoredContainerRequest(capability, null, null, priority1);
|
||||||
|
amClient.addContainerRequest(storedContainer1);
|
||||||
|
amClient.addContainerRequest(storedContainer2);
|
||||||
|
amClient.addContainerRequest(storedContainer3);
|
||||||
|
|
||||||
|
// test addition and storage
|
||||||
|
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
|
||||||
|
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
|
||||||
|
assertTrue(containersRequestedAny == 2);
|
||||||
|
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
|
||||||
|
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
|
||||||
|
assertTrue(containersRequestedAny == 1);
|
||||||
|
List<? extends Collection<StoredContainerRequest>> matches =
|
||||||
|
amClient.getMatchingRequests(priority, node, capability);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
matches = amClient.getMatchingRequests(priority, rack, capability);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
matches =
|
||||||
|
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
matches = amClient.getMatchingRequests(priority1, rack, capability);
|
||||||
|
assertTrue(matches.isEmpty());
|
||||||
|
matches =
|
||||||
|
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
|
||||||
|
// test removal
|
||||||
|
amClient.removeContainerRequest(storedContainer3);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, capability);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
amClient.removeContainerRequest(storedContainer2);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node, capability);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
matches = amClient.getMatchingRequests(priority, rack, capability);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
|
||||||
|
// test matching of containers
|
||||||
|
StoredContainerRequest storedRequest = matches.get(0).iterator().next();
|
||||||
|
assertTrue(storedContainer1 == storedRequest);
|
||||||
|
amClient.removeContainerRequest(storedContainer1);
|
||||||
|
matches =
|
||||||
|
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
|
||||||
|
assertTrue(matches.isEmpty());
|
||||||
|
matches =
|
||||||
|
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
|
||||||
|
assertTrue(matches.isEmpty());
|
||||||
|
// 0 requests left. everything got cleaned up
|
||||||
|
assertTrue(amClient.remoteRequestsTable.isEmpty());
|
||||||
|
|
||||||
|
// go through an exemplary allocation, matching and release cycle
|
||||||
|
amClient.addContainerRequest(storedContainer1);
|
||||||
|
amClient.addContainerRequest(storedContainer3);
|
||||||
|
// RM should allocate container within 2 calls to allocate()
|
||||||
|
int allocatedContainerCount = 0;
|
||||||
|
int iterationsLeft = 2;
|
||||||
|
while (allocatedContainerCount < 2
|
||||||
|
&& iterationsLeft-- > 0) {
|
||||||
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
||||||
|
assertTrue(nodeCount == amClient.getClusterNodeCount());
|
||||||
|
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
||||||
|
for(Container container : allocResponse.getAllocatedContainers()) {
|
||||||
|
ContainerRequest expectedRequest =
|
||||||
|
container.getPriority().equals(storedContainer1.getPriority()) ?
|
||||||
|
storedContainer1 : storedContainer3;
|
||||||
|
matches = amClient.getMatchingRequests(container.getPriority(),
|
||||||
|
ResourceRequest.ANY,
|
||||||
|
container.getResource());
|
||||||
|
// test correct matched container is returned
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
ContainerRequest matchedRequest = matches.get(0).iterator().next();
|
||||||
|
assertTrue(matchedRequest == expectedRequest);
|
||||||
|
|
||||||
|
// assign this container, use it and release it
|
||||||
|
amClient.releaseAssignedContainer(container.getId());
|
||||||
|
}
|
||||||
|
if(allocatedContainerCount < containersRequestedAny) {
|
||||||
|
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||||
|
sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(allocatedContainerCount == 2);
|
||||||
|
assertTrue(amClient.release.size() == 2);
|
||||||
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
|
||||||
|
|
||||||
|
|
||||||
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||||
|
null, null);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testAMRMClient() throws YarnRemoteException, IOException {
|
public void testAMRMClient() throws YarnRemoteException, IOException {
|
||||||
AMRMClientImpl amClient = null;
|
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||||
try {
|
try {
|
||||||
// start am rm client
|
// start am rm client
|
||||||
amClient = new AMRMClientImpl(attemptId);
|
amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
|
||||||
amClient.init(conf);
|
amClient.init(conf);
|
||||||
amClient.start();
|
amClient.start();
|
||||||
|
|
||||||
|
@ -156,36 +405,27 @@ public class TestAMRMClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||||
private void testAllocation(final AMRMClientImpl amClient)
|
|
||||||
throws YarnRemoteException, IOException {
|
throws YarnRemoteException, IOException {
|
||||||
// setup container request
|
// setup container request
|
||||||
final Resource capability = Records.newRecord(Resource.class);
|
|
||||||
final Priority priority = Records.newRecord(Priority.class);
|
|
||||||
priority.setPriority(0);
|
|
||||||
capability.setMemory(1024);
|
|
||||||
String node = nodeReports.get(0).getNodeId().getHost();
|
|
||||||
String rack = nodeReports.get(0).getRackName();
|
|
||||||
final String[] nodes = { node };
|
|
||||||
final String[] racks = { rack };
|
|
||||||
|
|
||||||
assertTrue(amClient.ask.size() == 0);
|
assertTrue(amClient.ask.size() == 0);
|
||||||
assertTrue(amClient.release.size() == 0);
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
||||||
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
amClient.addContainerRequest(
|
||||||
racks, priority, 1));
|
new ContainerRequest(capability, nodes, racks, priority, 1));
|
||||||
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
amClient.addContainerRequest(
|
||||||
racks, priority, 3));
|
new ContainerRequest(capability, nodes, racks, priority, 3));
|
||||||
amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
|
amClient.removeContainerRequest(
|
||||||
racks, priority, 2));
|
new ContainerRequest(capability, nodes, racks, priority, 2));
|
||||||
|
|
||||||
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
|
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
|
||||||
.get(node).get(capability).getNumContainers();
|
.get(node).get(capability).remoteRequest.getNumContainers();
|
||||||
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
|
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
|
||||||
.get(rack).get(capability).getNumContainers();
|
.get(rack).get(capability).remoteRequest.getNumContainers();
|
||||||
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
|
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
|
||||||
.get(ResourceRequest.ANY).get(capability).getNumContainers();
|
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
|
||||||
|
|
||||||
assertTrue(containersRequestedNode == 2);
|
assertTrue(containersRequestedNode == 2);
|
||||||
assertTrue(containersRequestedRack == 2);
|
assertTrue(containersRequestedRack == 2);
|
||||||
|
@ -221,8 +461,8 @@ public class TestAMRMClient {
|
||||||
assertTrue(amClient.ask.size() == 0);
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
|
||||||
// need to tell the AMRMClient that we dont need these resources anymore
|
// need to tell the AMRMClient that we dont need these resources anymore
|
||||||
amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
|
amClient.removeContainerRequest(
|
||||||
racks, priority, 2));
|
new ContainerRequest(capability, nodes, racks, priority, 2));
|
||||||
assertTrue(amClient.ask.size() == 3);
|
assertTrue(amClient.ask.size() == 3);
|
||||||
// send 0 container count request for resources that are no longer needed
|
// send 0 container count request for resources that are no longer needed
|
||||||
ResourceRequest snoopRequest = amClient.ask.iterator().next();
|
ResourceRequest snoopRequest = amClient.ask.iterator().next();
|
||||||
|
@ -241,8 +481,9 @@ public class TestAMRMClient {
|
||||||
new Answer<AllocateResponse>() {
|
new Answer<AllocateResponse>() {
|
||||||
public AllocateResponse answer(InvocationOnMock invocation)
|
public AllocateResponse answer(InvocationOnMock invocation)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
amClient.removeContainerRequest(new ContainerRequest(capability,
|
amClient.removeContainerRequest(
|
||||||
nodes, racks, priority, 2));
|
new ContainerRequest(capability, nodes,
|
||||||
|
racks, priority, 2));
|
||||||
throw new Exception();
|
throw new Exception();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
@ -48,9 +52,11 @@ public class TestAMRMClientAsync {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
|
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testAMRMClientAsync() throws Exception {
|
public void testAMRMClientAsync() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
|
||||||
List<ContainerStatus> completed1 = Arrays.asList(
|
List<ContainerStatus> completed1 = Arrays.asList(
|
||||||
BuilderUtils.newContainerStatus(
|
BuilderUtils.newContainerStatus(
|
||||||
BuilderUtils.newContainerId(0, 0, 0, 0),
|
BuilderUtils.newContainerId(0, 0, 0, 0),
|
||||||
|
@ -65,20 +71,38 @@ public class TestAMRMClientAsync {
|
||||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||||
|
|
||||||
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||||
AMRMClient client = mock(AMRMClient.class);
|
final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
|
final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
|
||||||
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
|
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public AllocateResponse answer(InvocationOnMock invocation)
|
public AllocateResponse answer(InvocationOnMock invocation)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
secondHeartbeatReceived.set(true);
|
secondHeartbeatSync.incrementAndGet();
|
||||||
|
while(heartbeatBlock.get()) {
|
||||||
|
synchronized(heartbeatBlock) {
|
||||||
|
heartbeatBlock.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
secondHeartbeatSync.incrementAndGet();
|
||||||
return response2;
|
return response2;
|
||||||
}
|
}
|
||||||
}).thenReturn(emptyResponse);
|
}).thenReturn(emptyResponse);
|
||||||
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
|
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
|
||||||
.thenReturn(null);
|
.thenReturn(null);
|
||||||
|
when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
|
||||||
|
@Override
|
||||||
|
public Resource answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
// take client lock to simulate behavior of real impl
|
||||||
|
synchronized (client) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
|
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
|
||||||
asyncClient.init(conf);
|
asyncClient.init(conf);
|
||||||
asyncClient.start();
|
asyncClient.start();
|
||||||
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||||
|
@ -86,10 +110,21 @@ public class TestAMRMClientAsync {
|
||||||
// while the CallbackHandler will still only be processing the first response,
|
// while the CallbackHandler will still only be processing the first response,
|
||||||
// heartbeater thread should still be sending heartbeats.
|
// heartbeater thread should still be sending heartbeats.
|
||||||
// To test this, wait for the second heartbeat to be received.
|
// To test this, wait for the second heartbeat to be received.
|
||||||
while (!secondHeartbeatReceived.get()) {
|
while (secondHeartbeatSync.get() < 1) {
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// heartbeat will be blocked. make sure we can call client methods at this
|
||||||
|
// time. Checks that heartbeat is not holding onto client lock
|
||||||
|
assert(secondHeartbeatSync.get() < 2);
|
||||||
|
asyncClient.getClusterAvailableResources();
|
||||||
|
// method returned. now unblock heartbeat
|
||||||
|
assert(secondHeartbeatSync.get() < 2);
|
||||||
|
synchronized (heartbeatBlock) {
|
||||||
|
heartbeatBlock.set(false);
|
||||||
|
heartbeatBlock.notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
// allocated containers should come before completed containers
|
// allocated containers should come before completed containers
|
||||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||||
|
|
||||||
|
@ -110,6 +145,73 @@ public class TestAMRMClientAsync {
|
||||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testAMRMClientAsyncException() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
|
String exStr = "TestException";
|
||||||
|
YarnRemoteException mockException = mock(YarnRemoteException.class);
|
||||||
|
when(mockException.getMessage()).thenReturn(exStr);
|
||||||
|
when(client.allocate(anyFloat())).thenThrow(mockException);
|
||||||
|
|
||||||
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
|
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
|
||||||
|
asyncClient.init(conf);
|
||||||
|
asyncClient.start();
|
||||||
|
|
||||||
|
synchronized (callbackHandler.notifier) {
|
||||||
|
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||||
|
while(callbackHandler.savedException == null) {
|
||||||
|
try {
|
||||||
|
callbackHandler.notifier.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
|
||||||
|
|
||||||
|
asyncClient.stop();
|
||||||
|
// stopping should have joined all threads and completed all callbacks
|
||||||
|
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testAMRMClientAsyncReboot() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
|
|
||||||
|
final AllocateResponse rebootResponse = createAllocateResponse(
|
||||||
|
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||||
|
rebootResponse.setReboot(true);
|
||||||
|
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
|
||||||
|
|
||||||
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
|
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
|
||||||
|
asyncClient.init(conf);
|
||||||
|
asyncClient.start();
|
||||||
|
|
||||||
|
synchronized (callbackHandler.notifier) {
|
||||||
|
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||||
|
while(callbackHandler.reboot == false) {
|
||||||
|
try {
|
||||||
|
callbackHandler.notifier.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
asyncClient.stop();
|
||||||
|
// stopping should have joined all threads and completed all callbacks
|
||||||
|
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
||||||
|
}
|
||||||
|
|
||||||
private AllocateResponse createAllocateResponse(
|
private AllocateResponse createAllocateResponse(
|
||||||
List<ContainerStatus> completed, List<Container> allocated) {
|
List<ContainerStatus> completed, List<Container> allocated) {
|
||||||
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
|
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
|
||||||
|
@ -120,6 +222,11 @@ public class TestAMRMClientAsync {
|
||||||
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||||
private volatile List<ContainerStatus> completedContainers;
|
private volatile List<ContainerStatus> completedContainers;
|
||||||
private volatile List<Container> allocatedContainers;
|
private volatile List<Container> allocatedContainers;
|
||||||
|
Exception savedException = null;
|
||||||
|
boolean reboot = false;
|
||||||
|
Object notifier = new Object();
|
||||||
|
|
||||||
|
int callbackCount = 0;
|
||||||
|
|
||||||
public List<ContainerStatus> takeCompletedContainers() {
|
public List<ContainerStatus> takeCompletedContainers() {
|
||||||
List<ContainerStatus> ret = completedContainers;
|
List<ContainerStatus> ret = completedContainers;
|
||||||
|
@ -176,9 +283,28 @@ public class TestAMRMClientAsync {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRebootRequest() {}
|
public void onRebootRequest() {
|
||||||
|
reboot = true;
|
||||||
|
synchronized (notifier) {
|
||||||
|
notifier.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
callbackCount++;
|
||||||
|
return 0.5f;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Exception e) {
|
||||||
|
savedException = e;
|
||||||
|
synchronized (notifier) {
|
||||||
|
notifier.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class TestNMClient {
|
||||||
Configuration conf = null;
|
Configuration conf = null;
|
||||||
MiniYARNCluster yarnCluster = null;
|
MiniYARNCluster yarnCluster = null;
|
||||||
YarnClientImpl yarnClient = null;
|
YarnClientImpl yarnClient = null;
|
||||||
AMRMClientImpl rmClient = null;
|
AMRMClientImpl<ContainerRequest> rmClient = null;
|
||||||
NMClientImpl nmClient = null;
|
NMClientImpl nmClient = null;
|
||||||
List<NodeReport> nodeReports = null;
|
List<NodeReport> nodeReports = null;
|
||||||
ApplicationAttemptId attemptId = null;
|
ApplicationAttemptId attemptId = null;
|
||||||
|
@ -136,7 +136,7 @@ public class TestNMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start am rm client
|
// start am rm client
|
||||||
rmClient = new AMRMClientImpl(attemptId);
|
rmClient = new AMRMClientImpl<ContainerRequest>(attemptId);
|
||||||
rmClient.init(conf);
|
rmClient.init(conf);
|
||||||
rmClient.start();
|
rmClient.start();
|
||||||
assertNotNull(rmClient);
|
assertNotNull(rmClient);
|
||||||
|
@ -185,7 +185,8 @@ public class TestNMClient {
|
||||||
null, null);
|
null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
|
private Set<Container> allocateContainers(
|
||||||
|
AMRMClientImpl<ContainerRequest> rmClient, int num)
|
||||||
throws YarnRemoteException, IOException {
|
throws YarnRemoteException, IOException {
|
||||||
// setup container request
|
// setup container request
|
||||||
Resource capability = Resource.newInstance(1024, 0);
|
Resource capability = Resource.newInstance(1024, 0);
|
||||||
|
@ -201,7 +202,8 @@ public class TestNMClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
|
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
|
||||||
.get(ResourceRequest.ANY).get(capability).getNumContainers();
|
.get(ResourceRequest.ANY).get(capability).remoteRequest
|
||||||
|
.getNumContainers();
|
||||||
|
|
||||||
// RM should allocate container within 2 calls to allocate()
|
// RM should allocate container within 2 calls to allocate()
|
||||||
int allocatedContainerCount = 0;
|
int allocatedContainerCount = 0;
|
||||||
|
|
Loading…
Reference in New Issue