merge -c r1459555 from trunk to branch-2 for YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks. (Sandy Ryza via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1459557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-03-21 22:30:10 +00:00
parent cbc30537d1
commit 4b1556e846
5 changed files with 756 additions and 237 deletions

View File

@ -33,6 +33,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
hitesh) hitesh)
YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
(Sandy Ryza via bikas)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -63,12 +63,12 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientImpl; import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -147,8 +147,8 @@ public class ApplicationMaster {
private YarnRPC rpc; private YarnRPC rpc;
// Handle to communicate with the Resource Manager // Handle to communicate with the Resource Manager
private AMRMClient resourceManager; private AMRMClientAsync resourceManager;
// Application Attempt Id ( combination of attemptId and fail count ) // Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID; private ApplicationAttemptId appAttemptID;
@ -169,8 +169,6 @@ public class ApplicationMaster {
// Priority of the request // Priority of the request
private int requestPriority; private int requestPriority;
// Simple flag to denote whether all works is done
private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed ) // Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger(); private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM // Allocated container count so that we know how many containers has the RM
@ -201,6 +199,9 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env // Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh"; private final String ExecShellStringPath = "ExecShellScript.sh";
private volatile boolean done;
private volatile boolean success;
// Launch threads // Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>(); private List<Thread> launchThreads = new ArrayList<Thread>();
@ -416,226 +417,202 @@ public class ApplicationMaster {
public boolean run() throws YarnRemoteException { public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster"); LOG.info("Starting ApplicationMaster");
// Connect to ResourceManager AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager = new AMRMClientImpl(appAttemptID);
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.start(); resourceManager.start();
try { // Setup local RPC Server to accept status requests directly from clients
// Setup local RPC Server to accept status requests directly from clients // TODO need to setup a protocol for client to be able to communicate to
// TODO need to setup a protocol for client to be able to communicate to // the RPC server
// the RPC server // TODO use the rpc port info to register with the RM for the client to
// TODO use the rpc port info to register with the RM for the client to // send requests to this app master
// send requests to this app master
// Register self with ResourceManager // Register self with ResourceManager
RegisterApplicationMasterResponse response = resourceManager // This will start heartbeating to the RM
.registerApplicationMaster(appMasterHostname, appMasterRpcPort, RegisterApplicationMasterResponse response = resourceManager
appMasterTrackingUrl); .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
// Dump out information about cluster capability as seen by the appMasterTrackingUrl);
// resource manager // Dump out information about cluster capability as seen by the
int minMem = response.getMinimumResourceCapability().getMemory(); // resource manager
int maxMem = response.getMaximumResourceCapability().getMemory(); int minMem = response.getMinimumResourceCapability().getMemory();
LOG.info("Min mem capabililty of resources in this cluster " + minMem); int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem); LOG.info("Min mem capabililty of resources in this cluster " + minMem);
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
// A resource ask has to be atleast the minimum of the capability of the // A resource ask has to be atleast the minimum of the capability of the
// cluster, the value has to be a multiple of the min value and cannot // cluster, the value has to be a multiple of the min value and cannot
// exceed the max. // exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the // If it is not an exact multiple of min, the RM will allocate to the
// nearest multiple of min // nearest multiple of min
if (containerMemory < minMem) { if (containerMemory < minMem) {
LOG.info("Container memory specified below min threshold of cluster." LOG.info("Container memory specified below min threshold of cluster."
+ " Using min value." + ", specified=" + containerMemory + ", min=" + " Using min value." + ", specified=" + containerMemory + ", min="
+ minMem); + minMem);
containerMemory = minMem; containerMemory = minMem;
} else if (containerMemory > maxMem) { } else if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster." LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max=" + " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem); + maxMem);
containerMemory = maxMem; containerMemory = maxMem;
}
// Setup heartbeat emitter
// TODO poll RM every now and then with an empty request to let RM know
// that we are alive
// The heartbeat interval after which an AM is timed out by the RM is
// defined by a config setting:
// RM_AM_EXPIRY_INTERVAL_MS with default defined by
// DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
// The allocate calls to the RM count as heartbeats so, for now,
// this additional heartbeat emitter is not required.
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
int loopCounter = -1;
while (numCompletedContainers.get() < numTotalContainers && !appDone) {
loopCounter++;
// log current state
LOG.info("Current application state: loop=" + loopCounter
+ ", appDone=" + appDone + ", total=" + numTotalContainers
+ ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
// Sleep before each loop when asking RM for containers
// to avoid flooding RM with spurious requests when it
// need not have any available containers
// Sleeping for 1000 ms.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted " + e.getMessage());
}
// No. of containers to request
// For the first loop, askCount will be equal to total containers needed
// From that point on, askCount will always be 0 as current
// implementation does not change its ask on container failures.
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
if (askCount > 0) {
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
resourceManager.addContainerRequest(containerAsk);
}
// Send the request to RM
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
AllocateResponse allocResp = sendContainerAskToRM();
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers =
allocResp.getAllocatedContainers();
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerState" + allocatedContainer.getState()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
// Check what the current available resources in the cluster are
// TODO should we do anything if the available resources are not enough?
Resource availableResources = allocResp.getAvailableResources();
LOG.info("Current available resources in the cluster "
+ availableResources);
// Check the completed containers
List<ContainerStatus> completedContainers = allocResp
.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID="
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
// increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
// container failed
if (-100 != exitStatus) {
// shell script failed
// counts as completed
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
} else {
// something else bad happened
// app job did not complete for some reason
// we should re-try as the container was lost for some reason
numAllocatedContainers.decrementAndGet();
numRequestedContainers.decrementAndGet();
// we do not need to release the container as it would be done
// by the RM/CM.
}
} else {
// nothing to do
// container completed successfully
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
}
if (numCompletedContainers.get() == numTotalContainers) {
appDone = true;
}
LOG.info("Current application state: loop=" + loopCounter
+ ", appDone=" + appDone + ", total=" + numTotalContainers
+ ", requested=" + numRequestedContainers + ", completed="
+ numCompletedContainers + ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
// TODO
// Add a timeout handling layer
// for misbehaving shell commands
}
// Join all launched threads
// needed for when we time out
// and we need to release containers
for (Thread launchThread : launchThreads) {
try {
launchThread.join(10000);
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
// When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM");
FinalApplicationStatus appStatus;
String appMessage = null;
boolean isSuccess = true;
if (numFailedContainers.get() == 0) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
isSuccess = false;
}
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
return isSuccess;
} finally {
resourceManager.stop();
} }
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
resourceManager.addContainerRequest(containerAsk);
numRequestedContainers.set(numTotalContainers);
while (!done) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {}
}
finish();
return success;
}
private void finish() {
// Join all launched threads
// needed for when we time out
// and we need to release containers
for (Thread launchThread : launchThreads) {
try {
launchThread.join(10000);
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
// When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM");
FinalApplicationStatus appStatus;
String appMessage = null;
success = true;
if (numFailedContainers.get() == 0) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
success = false;
}
try {
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnRemoteException ex) {
LOG.error("Failed to unregister application", ex);
}
done = true;
resourceManager.stop();
}
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@Override
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID="
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
// increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
// container failed
if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
// shell script failed
// counts as completed
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
} else {
// container was killed by framework, possibly preempted
// we should re-try as the container was lost for some reason
numAllocatedContainers.decrementAndGet();
numRequestedContainers.decrementAndGet();
// we do not need to release the container as it would be done
// by the RM
}
} else {
// nothing to do
// container completed successfully
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
}
// ask for more containers if any failed
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
if (askCount > 0) {
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
resourceManager.addContainerRequest(containerAsk);
}
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
resourceManager.setProgress(progress);
if (numCompletedContainers.get() == numTotalContainers) {
done = true;
}
}
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerState" + allocatedContainer.getState()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
}
@Override
public void onRebootRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
} }
/** /**
@ -811,21 +788,4 @@ public class ApplicationMaster {
LOG.info("Requested container ask: " + request.toString()); LOG.info("Requested container ask: " + request.toString());
return request; return request;
} }
/**
* Ask RM to allocate given no. of containers to this Application Master
*
* @param requestedContainers Containers to ask for from RM
* @return Response from RM to AM with allocated containers
* @throws YarnRemoteException
*/
private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
float progressIndicator = (float) numCompletedContainers.get()
/ numTotalContainers;
LOG.info("Sending request to RM for containers" + ", progress="
+ progressIndicator);
return resourceManager.allocate(progressIndicator);
}
} }

View File

@ -0,0 +1,354 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
* and provides asynchronous updates on events such as container allocations and
* completions. It contains a thread that sends periodic heartbeats to the
* ResourceManager.
*
* It should be used by implementing a CallbackHandler:
* <pre>
* {@code
* class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
* public void onContainersAllocated(List<Container> containers) {
* [run tasks on the containers]
* }
*
* public void onContainersCompleted(List<ContainerStatus> statuses) {
* [update progress, check whether app is done]
* }
*
* public void onNodesUpdated(List<NodeReport> updated) {}
*
* public void onReboot() {}
* }
* }
* </pre>
*
* The client's lifecycle should be managed similarly to the following:
*
* <pre>
* {@code
* AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* RegisterApplicationMasterResponse response = asyncClient
* .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
* appMasterTrackingUrl);
* asyncClient.addContainerRequest(containerRequest);
* [... wait for application to complete]
* asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
* asyncClient.stop();
* }
* </pre>
*/
@Unstable
@Evolving
public class AMRMClientAsync extends AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
private final AMRMClient client;
private final int intervalMs;
private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread;
private final CallbackHandler handler;
private final BlockingQueue<AllocateResponse> responseQueue;
private volatile boolean keepRunning;
private volatile float progress;
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) {
this(new AMRMClientImpl(id), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
AMRMClientAsync(AMRMClient client, int intervalMs,
CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
this.intervalMs = intervalMs;
handler = callbackHandler;
heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
keepRunning = true;
}
/**
* Sets the application's current progress. It will be transmitted to the
* resource manager on the next heartbeat.
* @param progress
* the application's progress so far
*/
public void setProgress(float progress) {
this.progress = progress;
}
@Override
public void init(Configuration conf) {
super.init(conf);
client.init(conf);
}
@Override
public void start() {
handlerThread.start();
client.start();
super.start();
}
/**
* Tells the heartbeat and handler threads to stop and waits for them to
* terminate. Calling this method from the callback handler thread would cause
* deadlock, and thus should be avoided.
*/
@Override
public void stop() {
if (Thread.currentThread() == handlerThread) {
throw new YarnException("Cannot call stop from callback handler thread!");
}
keepRunning = false;
try {
heartbeatThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with heartbeat thread", ex);
}
client.stop();
try {
handlerThread.interrupt();
handlerThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with hander thread", ex);
}
super.stop();
}
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.
*/
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException {
RegisterApplicationMasterResponse response =
client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
heartbeatThread.start();
return response;
}
/**
* Unregister the application master. This must be called in the end.
* @param appStatus Success/Failure status of the master
* @param appMessage Diagnostics message on failure
* @param appTrackingUrl New URL to get master info
* @throws YarnRemoteException
*/
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnRemoteException {
synchronized (client) {
keepRunning = false;
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
}
/**
* Request containers for resources before calling <code>allocate</code>
* @param req Resource request
*/
public void addContainerRequest(AMRMClient.ContainerRequest req) {
client.addContainerRequest(req);
}
/**
* Remove previous container request. The previous container request may have
* already been sent to the ResourceManager. So even after the remove request
* the app must be prepared to receive an allocation for the previous request
* even after the remove request
* @param req Resource request
*/
public void removeContainerRequest(AMRMClient.ContainerRequest req) {
client.removeContainerRequest(req);
}
/**
* Release containers assigned by the Resource Manager. If the app cannot use
* the container or wants to give up the container then it can release them.
* The app needs to make new requests for the released resource capability if
* it still needs it. eg. it released non-local resources
* @param containerId
*/
public void releaseAssignedContainer(ContainerId containerId) {
client.releaseAssignedContainer(containerId);
}
/**
* Get the currently available resources in the cluster.
* A valid value is available after a call to allocate has been made
* @return Currently available resources
*/
public Resource getClusterAvailableResources() {
return client.getClusterAvailableResources();
}
/**
* Get the current number of nodes in the cluster.
* A valid values is available after a call to allocate has been made
* @return Current number of nodes in the cluster
*/
public int getClusterNodeCount() {
return client.getClusterNodeCount();
}
private class HeartbeatThread extends Thread {
public HeartbeatThread() {
super("AMRM Heartbeater thread");
}
public void run() {
while (true) {
AllocateResponse response = null;
// synchronization ensures we don't send heartbeats after unregistering
synchronized (client) {
if (!keepRunning) {
break;
}
try {
response = client.allocate(progress);
} catch (YarnRemoteException ex) {
LOG.error("Failed to heartbeat", ex);
}
}
if (response != null) {
while (true) {
try {
responseQueue.put(response);
break;
} catch (InterruptedException ex) {
LOG.warn("Interrupted while waiting to put on response queue", ex);
}
}
}
try {
Thread.sleep(intervalMs);
} catch (InterruptedException ex) {
LOG.warn("Heartbeater interrupted", ex);
}
}
}
}
private class CallbackHandlerThread extends Thread {
public CallbackHandlerThread() {
super("AMRM Callback Handler Thread");
}
public void run() {
while (keepRunning) {
AllocateResponse response;
try {
response = responseQueue.take();
} catch (InterruptedException ex) {
LOG.info("Interrupted while waiting for queue");
continue;
}
if (response.getReboot()) {
handler.onRebootRequest();
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes);
}
List<ContainerStatus> completed =
response.getCompletedContainersStatuses();
if (!completed.isEmpty()) {
handler.onContainersCompleted(completed);
}
List<Container> allocated = response.getAllocatedContainers();
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
}
}
}
public interface CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
public void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
public void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to reboot
* for being out of sync.
*/
public void onRebootRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in in health,
* availability etc.
*/
public void onNodesUpdated(List<NodeReport> updatedNodes);
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import static org.mockito.Mockito.anyFloat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestAMRMClientAsync {
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
@Test(timeout=10000)
public void testAMRMClientAsync() throws Exception {
Configuration conf = new Configuration();
List<ContainerStatus> completed1 = Arrays.asList(
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(0, 0, 0, 0),
ContainerState.COMPLETE, "", 0));
List<Container> allocated1 = Arrays.asList(
BuilderUtils.newContainer(null, null, null, null, null, null));
final AllocateResponse response1 = createAllocateResponse(
new ArrayList<ContainerStatus>(), allocated1);
final AllocateResponse response2 = createAllocateResponse(completed1,
new ArrayList<Container>());
final AllocateResponse emptyResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
TestCallbackHandler callbackHandler = new TestCallbackHandler();
AMRMClient client = mock(AMRMClient.class);
final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
@Override
public AllocateResponse answer(InvocationOnMock invocation)
throws Throwable {
secondHeartbeatReceived.set(true);
return response2;
}
}).thenReturn(emptyResponse);
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(null);
AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
// while the CallbackHandler will still only be processing the first response,
// heartbeater thread should still be sending heartbeats.
// To test this, wait for the second heartbeat to be received.
while (!secondHeartbeatReceived.get()) {
Thread.sleep(10);
}
// allocated containers should come before completed containers
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
// wait for the allocated containers from the first heartbeat's response
while (callbackHandler.takeAllocatedContainers() == null) {
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
Thread.sleep(10);
}
// wait for the completed containers from the second heartbeat's response
while (callbackHandler.takeCompletedContainers() == null) {
Thread.sleep(10);
}
asyncClient.stop();
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
}
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
new ArrayList<NodeReport>(), null, false, 1);
return response;
}
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
public List<ContainerStatus> takeCompletedContainers() {
List<ContainerStatus> ret = completedContainers;
if (ret == null) {
return null;
}
completedContainers = null;
synchronized (ret) {
ret.notify();
}
return ret;
}
public List<Container> takeAllocatedContainers() {
List<Container> ret = allocatedContainers;
if (ret == null) {
return null;
}
allocatedContainers = null;
synchronized (ret) {
ret.notify();
}
return ret;
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
completedContainers = statuses;
// wait for containers to be taken before returning
synchronized (completedContainers) {
while (completedContainers != null) {
try {
completedContainers.wait();
} catch (InterruptedException ex) {
LOG.error("Interrupted during wait", ex);
}
}
}
}
@Override
public void onContainersAllocated(List<Container> containers) {
allocatedContainers = containers;
// wait for containers to be taken before returning
synchronized (allocatedContainers) {
while (allocatedContainers != null) {
try {
allocatedContainers.wait();
} catch (InterruptedException ex) {
LOG.error("Interrupted during wait", ex);
}
}
}
}
@Override
public void onRebootRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
}
}

View File

@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -404,4 +405,21 @@ public class BuilderUtils {
allocateRequest.addAllReleases(containersToBeReleased); allocateRequest.addAllReleases(containersToBeReleased);
return allocateRequest; return allocateRequest;
} }
public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes) {
AllocateResponse response = recordFactory
.newRecordInstance(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
response.setCompletedContainersStatuses(completedContainers);
response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setReboot(reboot);
return response;
}
} }