YARN-918. Remove ApplicationAttemptId from RegisterApplicationMasterRequestProto. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1504735 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-07-19 00:57:40 +00:00
parent 7a29bccd7a
commit ac914f79bc
29 changed files with 170 additions and 464 deletions

View File

@ -93,9 +93,9 @@ protected void serviceInit(Configuration conf) throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected synchronized void heartbeat() throws Exception { protected synchronized void heartbeat() throws Exception {
AllocateRequest allocateRequest = AllocateRequest.newInstance( AllocateRequest allocateRequest =
this.applicationAttemptId, this.lastResponseID, super AllocateRequest.newInstance(this.lastResponseID,
.getApplicationProgress(), new ArrayList<ResourceRequest>(), super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(), null); new ArrayList<ContainerId>(), null);
AllocateResponse allocateResponse; AllocateResponse allocateResponse;
try { try {
@ -143,7 +143,7 @@ public void handle(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event " + event.toString());
// Assign the same container ID as the AM // Assign the same container ID as the AM
ContainerId cID = ContainerId cID =
ContainerId.newInstance(applicationAttemptId, ContainerId.newInstance(getContext().getApplicationAttemptId(),
this.containerId.getId()); this.containerId.getId());
Container container = recordFactory.newRecordInstance(Container.class); Container container = recordFactory.newRecordInstance(Container.class);
container.setId(cID); container.setId(cID);

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -63,7 +62,6 @@ public abstract class RMCommunicator extends AbstractService
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis private int rmPollInterval;//millis
protected ApplicationId applicationId; protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
private final AtomicBoolean stopped; private final AtomicBoolean stopped;
protected Thread allocatorThread; protected Thread allocatorThread;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@ -91,7 +89,6 @@ public RMCommunicator(ClientService clientService, AppContext context) {
this.context = context; this.context = context;
this.eventHandler = context.getEventHandler(); this.eventHandler = context.getEventHandler();
this.applicationId = context.getApplicationID(); this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>(); this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
} }
@ -142,7 +139,6 @@ protected void register() {
try { try {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
if (serviceAddr != null) { if (serviceAddr != null) {
request.setHost(serviceAddr.getHostName()); request.setHost(serviceAddr.getHostName());
request.setRpcPort(serviceAddr.getPort()); request.setRpcPort(serviceAddr.getPort());
@ -193,11 +189,8 @@ protected void unregister() {
LOG.info("History url is " + historyUrl); LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request = FinishApplicationMasterRequest request =
recordFactory.newRecordInstance(FinishApplicationMasterRequest.class); FinishApplicationMasterRequest.newInstance(finishState,
request.setAppAttemptId(this.applicationAttemptId); sb.toString(), historyUrl);
request.setFinalApplicationStatus(finishState);
request.setDiagnostics(sb.toString());
request.setTrackingUrl(historyUrl);
scheduler.finishApplicationMaster(request); scheduler.finishApplicationMaster(request);
} catch(Exception are) { } catch(Exception are) {
LOG.error("Exception while unregistering ", are); LOG.error("Exception while unregistering ", are);

View File

@ -1161,14 +1161,6 @@ TaskAttemptId get(ContainerId cId) {
return containerToAttemptMap.get(cId); return containerToAttemptMap.get(cId);
} }
NodeId getNodeId(TaskAttemptId tId) {
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
return maps.get(tId).getNodeId();
} else {
return reduces.get(tId).getNodeId();
}
}
ContainerId get(TaskAttemptId tId) { ContainerId get(TaskAttemptId tId) {
Container taskContainer; Container taskContainer;
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {

View File

@ -145,10 +145,10 @@ protected void serviceInit(Configuration conf) throws Exception {
} }
protected AllocateResponse makeRemoteRequest() throws IOException { protected AllocateResponse makeRemoteRequest() throws IOException {
AllocateRequest allocateRequest = AllocateRequest.newInstance( AllocateRequest allocateRequest =
applicationAttemptId, lastResponseID, super.getApplicationProgress(), AllocateRequest.newInstance(lastResponseID,
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>( super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
release), null); new ArrayList<ContainerId>(release), null);
AllocateResponse allocateResponse; AllocateResponse allocateResponse;
try { try {
allocateResponse = scheduler.allocate(allocateRequest); allocateResponse = scheduler.allocate(allocateRequest);

View File

@ -234,7 +234,7 @@ public AllocateResponse allocate(AllocateRequest request)
for (int i = 0; i < numContainers; i++) { for (int i = 0; i < numContainers; i++) {
ContainerId containerId = ContainerId containerId =
ContainerId.newInstance( ContainerId.newInstance(
request.getApplicationAttemptId(), getContext().getApplicationAttemptId(),
request.getResponseId() + i); request.getResponseId() + i);
containers.add(Container.newInstance(containerId, containers.add(Container.newInstance(containerId,
NodeId.newInstance("host" + containerId.getId(), 2345), NodeId.newInstance("host" + containerId.getId(), 2345),

View File

@ -256,6 +256,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-701. Use application tokens irrespective of secure or non-secure YARN-701. Use application tokens irrespective of secure or non-secure
mode. (vinodkv via acmurthy) mode. (vinodkv via acmurthy)
YARN-918. Remove ApplicationAttemptId from
RegisterApplicationMasterRequestProto. (vinodkv via acmurthy)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -23,10 +23,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -36,10 +35,6 @@
* *
* <p>The request includes: * <p>The request includes:
* <ul> * <ul>
* <li>
* {@link ApplicationAttemptId} being managed by the
* <code>ApplicationMaster</code>
* </li>
* <li>A response id to track duplicate responses.</li> * <li>A response id to track duplicate responses.</li>
* <li>Progress information.</li> * <li>Progress information.</li>
* <li> * <li>
@ -61,13 +56,11 @@ public abstract class AllocateRequest {
@Public @Public
@Stable @Stable
public static AllocateRequest newInstance( public static AllocateRequest newInstance(int responseID, float appProgress,
ApplicationAttemptId applicationAttemptId, int responseID, List<ResourceRequest> resourceAsk,
float appProgress, List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased, List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) { ResourceBlacklistRequest resourceBlacklistRequest) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(responseID); allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress); allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk); allocateRequest.setAskList(resourceAsk);
@ -76,26 +69,6 @@ public static AllocateRequest newInstance(
return allocateRequest; return allocateRequest;
} }
/**
* Get the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @return <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract ApplicationAttemptId getApplicationAttemptId();
/**
* Set the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @param applicationAttemptId <code>ApplicationAttemptId</code> being managed
* by the <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract void setApplicationAttemptId(ApplicationAttemptId applicationAttemptId);
/** /**
* Get the <em>response id</em> used to track duplicate responses. * Get the <em>response id</em> used to track duplicate responses.
* @return <em>response id</em> * @return <em>response id</em>

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -31,10 +30,6 @@
* *
* <p>The final request includes details such: * <p>The final request includes details such:
* <ul> * <ul>
* <li>
* {@link ApplicationAttemptId} being managed by the
* <code>ApplicationMaster</code>
* </li>
* <li>Final state of the <code>ApplicationMaster</code></li> * <li>Final state of the <code>ApplicationMaster</code></li>
* <li> * <li>
* Diagnostic information in case of failure of the * Diagnostic information in case of failure of the
@ -53,37 +48,15 @@ public abstract class FinishApplicationMasterRequest {
@Public @Public
@Stable @Stable
public static FinishApplicationMasterRequest newInstance( public static FinishApplicationMasterRequest newInstance(
ApplicationAttemptId appAttemptId, FinalApplicationStatus finalAppStatus, FinalApplicationStatus finalAppStatus, String diagnostics, String url) {
String diagnostics, String url) {
FinishApplicationMasterRequest request = FinishApplicationMasterRequest request =
Records.newRecord(FinishApplicationMasterRequest.class); Records.newRecord(FinishApplicationMasterRequest.class);
request.setAppAttemptId(appAttemptId);
request.setFinalApplicationStatus(finalAppStatus); request.setFinalApplicationStatus(finalAppStatus);
request.setDiagnostics(diagnostics); request.setDiagnostics(diagnostics);
request.setTrackingUrl(url); request.setTrackingUrl(url);
return request; return request;
} }
/**
* Get the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @return <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract ApplicationAttemptId getApplicationAttemptId();
/**
* Set the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @param applicationAttemptId <code>ApplicationAttemptId</code> being managed
* by the <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract void setAppAttemptId(ApplicationAttemptId applicationAttemptId);
/** /**
* Get <em>final state</em> of the <code>ApplicationMaster</code>. * Get <em>final state</em> of the <code>ApplicationMaster</code>.
* @return <em>final state</em> of the <code>ApplicationMaster</code> * @return <em>final state</em> of the <code>ApplicationMaster</code>

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -30,10 +29,6 @@
* *
* <p>The registration includes details such as: * <p>The registration includes details such as:
* <ul> * <ul>
* <li>
* {@link ApplicationAttemptId} being managed by the
* <code>ApplicationMaster</code>
* </li>
* <li>Hostname on which the AM is running.</li> * <li>Hostname on which the AM is running.</li>
* <li>RPC Port</li> * <li>RPC Port</li>
* <li>Tracking URL</li> * <li>Tracking URL</li>
@ -57,38 +52,16 @@ public abstract class RegisterApplicationMasterRequest {
*/ */
@Public @Public
@Stable @Stable
public static RegisterApplicationMasterRequest newInstance( public static RegisterApplicationMasterRequest newInstance(String host,
ApplicationAttemptId applicationAttemptId, String host, int port, int port, String trackingUrl) {
String trackingUrl) {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
request.setHost(host); request.setHost(host);
request.setRpcPort(port); request.setRpcPort(port);
request.setTrackingUrl(trackingUrl); request.setTrackingUrl(trackingUrl);
return request; return request;
} }
/**
* Get the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @return <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract ApplicationAttemptId getApplicationAttemptId();
/**
* Set the <code>ApplicationAttemptId</code> being managed by the
* <code>ApplicationMaster</code>.
* @param applicationAttemptId <code>ApplicationAttemptId</code> being managed
* by the <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract void setApplicationAttemptId(ApplicationAttemptId applicationAttemptId);
/** /**
* Get the <em>host</em> on which the <code>ApplicationMaster</code> is * Get the <em>host</em> on which the <code>ApplicationMaster</code> is
* running. * running.

View File

@ -34,10 +34,9 @@ import "yarn_protos.proto";
/////// AM_RM_Protocol /////////////////////////////// /////// AM_RM_Protocol ///////////////////////////////
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
message RegisterApplicationMasterRequestProto { message RegisterApplicationMasterRequestProto {
optional ApplicationAttemptIdProto application_attempt_id = 1; optional string host = 1;
optional string host = 2; optional int32 rpc_port = 2;
optional int32 rpc_port = 3; optional string tracking_url = 3;
optional string tracking_url = 4;
} }
message RegisterApplicationMasterResponseProto { message RegisterApplicationMasterResponseProto {
@ -47,22 +46,20 @@ message RegisterApplicationMasterResponseProto {
} }
message FinishApplicationMasterRequestProto { message FinishApplicationMasterRequestProto {
optional ApplicationAttemptIdProto application_attempt_id = 1; optional string diagnostics = 1;
optional string diagnostics = 2; optional string tracking_url = 2;
optional string tracking_url = 3; optional FinalApplicationStatusProto final_application_status = 3;
optional FinalApplicationStatusProto final_application_status = 4;
} }
message FinishApplicationMasterResponseProto { message FinishApplicationMasterResponseProto {
} }
message AllocateRequestProto { message AllocateRequestProto {
optional ApplicationAttemptIdProto application_attempt_id = 1; repeated ResourceRequestProto ask = 1;
repeated ResourceRequestProto ask = 2; repeated ContainerIdProto release = 2;
repeated ContainerIdProto release = 3; optional ResourceBlacklistRequestProto blacklist_request = 3;
optional ResourceBlacklistRequestProto blacklist_request = 4; optional int32 response_id = 4;
optional int32 response_id = 5; optional float progress = 5;
optional float progress = 6;
} }
message NMTokenProto { message NMTokenProto {

View File

@ -443,7 +443,7 @@ public boolean run() throws YarnException, IOException {
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager = resourceManager =
AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener); AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.start(); resourceManager.start();

View File

@ -29,15 +29,15 @@
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
@ -49,17 +49,13 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* For usage: * For usage:
* <pre> * <pre>
* {@code * {@code
* AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId) * AMRMClient.<T>createAMRMClientContainerRequest()
* }</pre> * }</pre>
* @param appAttemptId the appAttemptId associated with the AMRMClient
* @return the newly create AMRMClient instance. * @return the newly create AMRMClient instance.
*/ */
@Public @Public
public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient( public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
ApplicationAttemptId appAttemptId) { AMRMClient<T> client = new AMRMClientImpl<T>();
Preconditions.checkArgument(appAttemptId != null,
"ApplicationAttempId should not be null");
AMRMClient<T> client = new AMRMClientImpl<T>(appAttemptId);
return client; return client;
} }

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -97,24 +96,18 @@ public abstract class AMRMClientAsync<T extends ContainerRequest>
protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
public static <T extends ContainerRequest> AMRMClientAsync<T> public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync( createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
ApplicationAttemptId id, return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
int intervalMs,
CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(id, intervalMs, callbackHandler);
} }
public static <T extends ContainerRequest> AMRMClientAsync<T> public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync( createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
AMRMClient<T> client,
int intervalMs,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
} }
protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs, protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
CallbackHandler callbackHandler) { this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
} }
@Private @Private

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -68,9 +67,8 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
private volatile Exception savedException; private volatile Exception savedException;
public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs, public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
CallbackHandler callbackHandler) { this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
} }
@Private @Private

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
@ -61,8 +60,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -77,13 +74,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private static final List<String> ANY_LIST = private static final List<String> ANY_LIST =
Collections.singletonList(ResourceRequest.ANY); Collections.singletonList(ResourceRequest.ANY);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private int lastResponseId = 0; private int lastResponseId = 0;
protected ApplicationMasterProtocol rmClient; protected ApplicationMasterProtocol rmClient;
protected final ApplicationAttemptId appAttemptId;
protected Resource clusterAvailableResources; protected Resource clusterAvailableResources;
protected int clusterNodeCount; protected int clusterNodeCount;
@ -154,9 +147,8 @@ static boolean canFit(Resource arg0, Resource arg1) {
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set<ContainerId> release = new TreeSet<ContainerId>(); protected final Set<ContainerId> release = new TreeSet<ContainerId>();
public AMRMClientImpl(ApplicationAttemptId appAttemptId) { public AMRMClientImpl() {
super(AMRMClientImpl.class.getName()); super(AMRMClientImpl.class.getName());
this.appAttemptId = appAttemptId;
} }
@Override @Override
@ -193,18 +185,11 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
Preconditions.checkArgument(appHostPort >= 0, Preconditions.checkArgument(appHostPort >= 0,
"Port number of the host should not be negative"); "Port number of the host should not be negative");
// do this only once ??? // do this only once ???
RegisterApplicationMasterRequest request = recordFactory RegisterApplicationMasterRequest request =
.newRecordInstance(RegisterApplicationMasterRequest.class); RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
synchronized (this) { appTrackingUrl);
request.setApplicationAttemptId(appAttemptId); RegisterApplicationMasterResponse response =
} rmClient.registerApplicationMaster(request);
request.setHost(appHostName);
request.setRpcPort(appHostPort);
if(appTrackingUrl != null) {
request.setTrackingUrl(appTrackingUrl);
}
RegisterApplicationMasterResponse response = rmClient
.registerApplicationMaster(request);
return response; return response;
} }
@ -233,8 +218,8 @@ public AllocateResponse allocate(float progressIndicator)
ask.clear(); ask.clear();
release.clear(); release.clear();
allocateRequest = allocateRequest =
AllocateRequest.newInstance(appAttemptId, lastResponseId, AllocateRequest.newInstance(lastResponseId, progressIndicator,
progressIndicator, askList, releaseList, null); askList, releaseList, null);
} }
allocateResponse = rmClient.allocate(allocateRequest); allocateResponse = rmClient.allocate(allocateRequest);
@ -292,18 +277,9 @@ protected void populateNMTokens(AllocateResponse allocateResponse) {
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException, String appMessage, String appTrackingUrl) throws YarnException,
IOException { IOException {
Preconditions.checkArgument(appStatus != null, FinishApplicationMasterRequest request =
"AppStatus should not be null."); FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
FinishApplicationMasterRequest request = recordFactory appTrackingUrl);
.newRecordInstance(FinishApplicationMasterRequest.class);
request.setAppAttemptId(appAttemptId);
request.setFinalApplicationStatus(appStatus);
if(appMessage != null) {
request.setDiagnostics(appMessage);
}
if(appTrackingUrl != null) {
request.setTrackingUrl(appTrackingUrl);
}
rmClient.finishApplicationMaster(request); rmClient.finishApplicationMaster(request);
} }
@ -553,7 +529,7 @@ private void addResourceRequest(Priority priority, String resourceName,
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId=" LOG.debug("addResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers() + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size()); + " #asks=" + ask.size());
@ -587,7 +563,7 @@ private void decResourceRequest(Priority priority,
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId=" LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers() + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size()); + " #asks=" + ask.size());
@ -620,7 +596,7 @@ private void decResourceRequest(Priority priority,
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.info("AFTER decResourceRequest:" + " applicationId=" LOG.info("AFTER decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers() + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size()); + " #asks=" + ask.size());

View File

@ -200,7 +200,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null; AMRMClient<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId); amClient = AMRMClient.<ContainerRequest>createAMRMClient();
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -314,7 +314,7 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce
AMRMClientImpl<ContainerRequest> amClient = null; AMRMClientImpl<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl<ContainerRequest>(attemptId); amClient = new AMRMClientImpl<ContainerRequest>();
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -361,7 +361,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
// start am rm client // start am rm client
amClient = amClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient (AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient(attemptId); .<ContainerRequest> createAMRMClient();
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -482,7 +482,7 @@ public void testAMRMClient() throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null; AMRMClient<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId); amClient = AMRMClient.<ContainerRequest>createAMRMClient();
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();

View File

@ -18,30 +18,27 @@
package org.apache.hadoop.yarn.client.api.impl; package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestAMRMClientContainerRequest { public class TestAMRMClientContainerRequest {
@Test @Test
public void testFillInRacks() { public void testFillInRacks() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>( AMRMClientImpl<ContainerRequest> client =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
@ -63,8 +60,8 @@ public void testFillInRacks() {
@Test @Test
public void testDisableLocalityRelaxation() { public void testDisableLocalityRelaxation() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>( AMRMClientImpl<ContainerRequest> client =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@ -130,8 +127,8 @@ public void testDisableLocalityRelaxation() {
@Test (expected = InvalidContainerRequestException.class) @Test (expected = InvalidContainerRequestException.class)
public void testDifferentLocalityRelaxationSamePriority() { public void testDifferentLocalityRelaxationSamePriority() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>( AMRMClientImpl<ContainerRequest> client =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@ -151,8 +148,8 @@ public void testDifferentLocalityRelaxationSamePriority() {
@Test @Test
public void testInvalidValidWhenOldRemoved() { public void testInvalidValidWhenOldRemoved() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>( AMRMClientImpl<ContainerRequest> client =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@ -190,8 +187,8 @@ public void testInvalidValidWhenOldRemoved() {
@Test (expected = InvalidContainerRequestException.class) @Test (expected = InvalidContainerRequestException.class)
public void testLocalityRelaxationDifferentLevels() { public void testLocalityRelaxationDifferentLevels() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>( AMRMClientImpl<ContainerRequest> client =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,

View File

@ -157,7 +157,7 @@ public void setup() throws YarnException, IOException {
// start am rm client // start am rm client
rmClient = rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient (AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient(attemptId); .<ContainerRequest> createAMRMClient();
rmClient.init(conf); rmClient.init(conf);
rmClient.start(); rmClient.start();
assertNotNull(rmClient); assertNotNull(rmClient);

View File

@ -26,15 +26,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
@ -48,7 +45,6 @@ public class AllocateRequestPBImpl extends AllocateRequest {
AllocateRequestProto.Builder builder = null; AllocateRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private ApplicationAttemptId applicationAttemptID = null;
private List<ResourceRequest> ask = null; private List<ResourceRequest> ask = null;
private List<ContainerId> release = null; private List<ContainerId> release = null;
private ResourceBlacklistRequest blacklistRequest = null; private ResourceBlacklistRequest blacklistRequest = null;
@ -91,9 +87,6 @@ public String toString() {
} }
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.applicationAttemptID != null) {
builder.setApplicationAttemptId(convertToProtoFormat(this.applicationAttemptID));
}
if (this.ask != null) { if (this.ask != null) {
addAsksToProto(); addAsksToProto();
} }
@ -120,27 +113,6 @@ private void maybeInitBuilder() {
viaProto = false; viaProto = false;
} }
@Override
public ApplicationAttemptId getApplicationAttemptId() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationAttemptID != null) {
return this.applicationAttemptID;
}
if (!p.hasApplicationAttemptId()) {
return null;
}
this.applicationAttemptID = convertFromProtoFormat(p.getApplicationAttemptId());
return this.applicationAttemptID;
}
@Override
public void setApplicationAttemptId(ApplicationAttemptId appAttemptId) {
maybeInitBuilder();
if (appAttemptId == null)
builder.clearApplicationAttemptId();
this.applicationAttemptID = appAttemptId;
}
@Override @Override
public int getResponseId() { public int getResponseId() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -311,14 +283,6 @@ public void remove() {
builder.addAllRelease(iterable); builder.addAllRelease(iterable);
} }
private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p);
}
private ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl)t).getProto();
}
private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) { private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
return new ResourceRequestPBImpl(p); return new ResourceRequestPBImpl(p);
} }

View File

@ -22,11 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProtoOrBuilder;
@ -38,9 +35,6 @@ public class FinishApplicationMasterRequestPBImpl extends FinishApplicationMaste
FinishApplicationMasterRequestProto.Builder builder = null; FinishApplicationMasterRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private ApplicationAttemptId appAttemptId = null;
public FinishApplicationMasterRequestPBImpl() { public FinishApplicationMasterRequestPBImpl() {
builder = FinishApplicationMasterRequestProto.newBuilder(); builder = FinishApplicationMasterRequestProto.newBuilder();
} }
@ -78,9 +72,6 @@ public String toString() {
} }
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.appAttemptId != null) {
builder.setApplicationAttemptId(convertToProtoFormat(this.appAttemptId));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -98,27 +89,6 @@ private void maybeInitBuilder() {
viaProto = false; viaProto = false;
} }
@Override
public ApplicationAttemptId getApplicationAttemptId() {
FinishApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.appAttemptId != null) {
return this.appAttemptId;
}
if (!p.hasApplicationAttemptId()) {
return null;
}
this.appAttemptId = convertFromProtoFormat(p.getApplicationAttemptId());
return this.appAttemptId;
}
@Override
public void setAppAttemptId(ApplicationAttemptId applicationAttemptId) {
maybeInitBuilder();
if (applicationAttemptId == null)
builder.clearApplicationAttemptId();
this.appAttemptId = applicationAttemptId;
}
@Override @Override
public String getDiagnostics() { public String getDiagnostics() {
FinishApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; FinishApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -128,6 +98,10 @@ public String getDiagnostics() {
@Override @Override
public void setDiagnostics(String diagnostics) { public void setDiagnostics(String diagnostics) {
maybeInitBuilder(); maybeInitBuilder();
if (diagnostics == null) {
builder.clearDiagnostics();
return;
}
builder.setDiagnostics(diagnostics); builder.setDiagnostics(diagnostics);
} }
@ -140,6 +114,10 @@ public String getTrackingUrl() {
@Override @Override
public void setTrackingUrl(String url) { public void setTrackingUrl(String url) {
maybeInitBuilder(); maybeInitBuilder();
if (url == null) {
builder.clearTrackingUrl();
return;
}
builder.setTrackingUrl(url); builder.setTrackingUrl(url);
} }
@ -162,14 +140,6 @@ public void setFinalApplicationStatus(FinalApplicationStatus finalState) {
builder.setFinalApplicationStatus(convertToProtoFormat(finalState)); builder.setFinalApplicationStatus(convertToProtoFormat(finalState));
} }
private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p);
}
private ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl)t).getProto();
}
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s); return ProtoUtils.convertFromProtoFormat(s);
} }

View File

@ -22,9 +22,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProtoOrBuilder;
@ -35,9 +32,6 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM
RegisterApplicationMasterRequestProto.Builder builder = null; RegisterApplicationMasterRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private ApplicationAttemptId applicationAttemptId = null;
public RegisterApplicationMasterRequestPBImpl() { public RegisterApplicationMasterRequestPBImpl() {
builder = RegisterApplicationMasterRequestProto.newBuilder(); builder = RegisterApplicationMasterRequestProto.newBuilder();
} }
@ -75,9 +69,6 @@ public String toString() {
} }
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.applicationAttemptId != null && !((ApplicationAttemptIdPBImpl)this.applicationAttemptId).getProto().equals(builder.getApplicationAttemptId())) {
builder.setApplicationAttemptId(convertToProtoFormat(this.applicationAttemptId));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -95,28 +86,6 @@ private void maybeInitBuilder() {
viaProto = false; viaProto = false;
} }
@Override
public ApplicationAttemptId getApplicationAttemptId() {
RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationAttemptId != null) {
return this.applicationAttemptId;
}
if (!p.hasApplicationAttemptId()) {
return null;
}
this.applicationAttemptId = convertFromProtoFormat(p.getApplicationAttemptId());
return this.applicationAttemptId;
}
@Override
public void setApplicationAttemptId(ApplicationAttemptId applicationMaster) {
maybeInitBuilder();
if (applicationMaster == null)
builder.clearApplicationAttemptId();
this.applicationAttemptId = applicationMaster;
}
@Override @Override
public String getHost() { public String getHost() {
RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -126,6 +95,10 @@ public String getHost() {
@Override @Override
public void setHost(String host) { public void setHost(String host) {
maybeInitBuilder(); maybeInitBuilder();
if (host == null) {
builder.clearHost();
return;
}
builder.setHost(host); builder.setHost(host);
} }
@ -150,15 +123,10 @@ public String getTrackingUrl() {
@Override @Override
public void setTrackingUrl(String url) { public void setTrackingUrl(String url) {
maybeInitBuilder(); maybeInitBuilder();
if (url == null) {
builder.clearTrackingUrl();
return;
}
builder.setTrackingUrl(url); builder.setTrackingUrl(url);
} }
private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p);
}
private ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl)t).getProto();
}
} }

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -391,20 +390,6 @@ public static URL newURL(String scheme, String host, int port, String file) {
return url; return url;
} }
public static AllocateRequest newAllocateRequest(
ApplicationAttemptId applicationAttemptId, int responseID,
float appProgress, List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased) {
AllocateRequest allocateRequest = recordFactory
.newRecordInstance(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased);
return allocateRequest;
}
public static AllocateResponse newAllocateResponse(int responseId, public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers, List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes, List<Container> allocatedContainers, List<NodeReport> updatedNodes,

View File

@ -167,18 +167,16 @@ private AMRMTokenIdentifier selectAMRMTokenIdentifier(
return result; return result;
} }
private void authorizeRequest(ApplicationAttemptId appAttemptID) private ApplicationAttemptId authorizeRequest()
throws YarnException { throws YarnException {
String appAttemptIDStr = appAttemptID.toString();
UserGroupInformation remoteUgi; UserGroupInformation remoteUgi;
try { try {
remoteUgi = UserGroupInformation.getCurrentUser(); remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) { } catch (IOException e) {
String msg = "Cannot obtain the user-name for ApplicationAttemptID: " String msg =
+ appAttemptIDStr + ". Got exception: " "Cannot obtain the user-name for authorizing ApplicationMaster. "
+ StringUtils.stringifyException(e); + "Got exception: " + StringUtils.stringifyException(e);
LOG.warn(msg); LOG.warn(msg);
throw RPCUtil.getRemoteException(msg); throw RPCUtil.getRemoteException(msg);
} }
@ -190,14 +188,15 @@ private void authorizeRequest(ApplicationAttemptId appAttemptID)
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
if (appTokenIdentifier == null) { if (appTokenIdentifier == null) {
tokenFound = false; tokenFound = false;
message = "No AMRMToken found for " + appAttemptIDStr; message = "No AMRMToken found for user " + remoteUgi.getUserName();
} else { } else {
tokenFound = true; tokenFound = true;
} }
} catch (IOException e) { } catch (IOException e) {
tokenFound = false; tokenFound = false;
message = message =
"Got exception while looking for AMRMToken for " + appAttemptIDStr; "Got exception while looking for AMRMToken for user "
+ remoteUgi.getUserName();
} }
if (!tokenFound) { if (!tokenFound) {
@ -205,15 +204,7 @@ private void authorizeRequest(ApplicationAttemptId appAttemptID)
throw RPCUtil.getRemoteException(message); throw RPCUtil.getRemoteException(message);
} }
ApplicationAttemptId remoteApplicationAttemptId = return appTokenIdentifier.getApplicationAttemptId();
appTokenIdentifier.getApplicationAttemptId();
if (!remoteApplicationAttemptId.equals(appAttemptID)) {
String msg = "Unauthorized request from ApplicationMaster. "
+ "Expected ApplicationAttemptID: " + remoteApplicationAttemptId
+ " Found: " + appAttemptIDStr;
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
} }
@Override @Override
@ -221,9 +212,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException, RegisterApplicationMasterRequest request) throws YarnException,
IOException { IOException {
ApplicationAttemptId applicationAttemptId = request ApplicationAttemptId applicationAttemptId = authorizeRequest();
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
ApplicationId appID = applicationAttemptId.getApplicationId(); ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponse lastResponse = responseMap.get(applicationAttemptId); AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
@ -293,9 +282,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException, FinishApplicationMasterRequest request) throws YarnException,
IOException { IOException {
ApplicationAttemptId applicationAttemptId = request ApplicationAttemptId applicationAttemptId = authorizeRequest();
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
AllocateResponse lastResponse = responseMap.get(applicationAttemptId); AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) { if (lastResponse == null) {
@ -343,8 +330,7 @@ public boolean hasApplicationMasterRegistered(
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException { throws YarnException, IOException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId(); ApplicationAttemptId appAttemptId = authorizeRequest();
authorizeRequest(appAttemptId);
this.amLivelinessMonitor.receivedPing(appAttemptId); this.amLivelinessMonitor.receivedPing(appAttemptId);

View File

@ -96,7 +96,6 @@ public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
responseId = 0; responseId = 0;
final RegisterApplicationMasterRequest req = final RegisterApplicationMasterRequest req =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
req.setApplicationAttemptId(attemptId);
req.setHost(""); req.setHost("");
req.setRpcPort(1); req.setRpcPort(1);
req.setTrackingUrl(""); req.setTrackingUrl("");
@ -174,8 +173,9 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori
public AllocateResponse allocate( public AllocateResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases) List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception { throws Exception {
final AllocateRequest req = AllocateRequest.newInstance(attemptId, final AllocateRequest req =
++responseId, 0F, resourceRequest, releases, null); AllocateRequest.newInstance(++responseId, 0F, resourceRequest,
releases, null);
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString()); UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token = Token<AMRMTokenIdentifier> token =
@ -197,11 +197,8 @@ public AllocateResponse run() throws Exception {
public void unregisterAppAttempt() throws Exception { public void unregisterAppAttempt() throws Exception {
waitForState(RMAppAttemptState.RUNNING); waitForState(RMAppAttemptState.RUNNING);
final FinishApplicationMasterRequest req = final FinishApplicationMasterRequest req =
Records.newRecord(FinishApplicationMasterRequest.class); FinishApplicationMasterRequest.newInstance(
req.setAppAttemptId(attemptId); FinalApplicationStatus.SUCCEEDED, "", "");
req.setDiagnostics("");
req.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
req.setTrackingUrl("");
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString()); UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token = Token<AMRMTokenIdentifier> token =

View File

@ -52,8 +52,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -66,6 +66,7 @@ public class TestAMAuthorization {
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class); private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
private final Configuration conf; private final Configuration conf;
private MockRM rm;
@Parameters @Parameters
public static Collection<Object[]> configs() { public static Collection<Object[]> configs() {
@ -82,6 +83,13 @@ public TestAMAuthorization(Configuration conf) {
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
} }
@After
public void tearDown() {
if (rm != null) {
rm.stop();
}
}
public static final class MyContainerManager implements ContainerManagementProtocol { public static final class MyContainerManager implements ContainerManagementProtocol {
public ByteBuffer containerTokens; public ByteBuffer containerTokens;
@ -139,7 +147,7 @@ protected ApplicationMasterService createApplicationMasterService() {
@Test @Test
public void testAuthorizedAccess() throws Exception { public void testAuthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
final MockRM rm = rm =
new MockRMWithAMS(conf, containerManager); new MockRMWithAMS(conf, containerManager);
rm.start(); rm.start();
@ -183,7 +191,6 @@ public ApplicationMasterProtocol run() {
RegisterApplicationMasterRequest request = Records RegisterApplicationMasterRequest request = Records
.newRecord(RegisterApplicationMasterRequest.class); .newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
client.registerApplicationMaster(request); client.registerApplicationMaster(request);
Assert.assertNotNull(response.getClientToAMTokenMasterKey()); Assert.assertNotNull(response.getClientToAMTokenMasterKey());
@ -193,14 +200,12 @@ public ApplicationMasterProtocol run() {
} }
Assert.assertEquals("Register response has bad ACLs", "*", Assert.assertEquals("Register response has bad ACLs", "*",
response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP)); response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP));
rm.stop();
} }
@Test @Test
public void testUnauthorizedAccess() throws Exception { public void testUnauthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
MockRM rm = new MockRMWithAMS(conf, containerManager); rm = new MockRMWithAMS(conf, containerManager);
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120); MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@ -242,7 +247,6 @@ public ApplicationMasterProtocol run() {
RegisterApplicationMasterRequest request = Records RegisterApplicationMasterRequest request = Records
.newRecord(RegisterApplicationMasterRequest.class); .newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
try { try {
client.registerApplicationMaster(request); client.registerApplicationMaster(request);
Assert.fail("Should fail with authorization error"); Assert.fail("Should fail with authorization error");
@ -260,37 +264,8 @@ public ApplicationMasterProtocol run() {
+ "Available:" + availableAuthMethods)); + "Available:" + availableAuthMethods));
} }
// Now try to validate invalid authorization. // TODO: Add validation of invalid authorization when there's more data in
Credentials credentials = containerManager.getContainerCredentials(); // the AMRMToken
currentUser.addCredentials(credentials);
// Create a client to the RM.
client = currentUser
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class,
serviceAddr, conf);
}
});
request =
Records.newRecord(RegisterApplicationMasterRequest.class);
ApplicationAttemptId otherAppAttemptId = BuilderUtils
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
request.setApplicationAttemptId(otherAppAttemptId);
try {
client.registerApplicationMaster(request);
Assert.fail("Should fail with authorization error");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(
"Unauthorized request from ApplicationMaster. "
+ "Expected ApplicationAttemptID: "
+ applicationAttemptId.toString() + " Found: "
+ otherAppAttemptId.toString()));
} finally {
rm.stop();
}
} }
private void waitForLaunchedState(RMAppAttempt attempt) private void waitForLaunchedState(RMAppAttempt attempt)

View File

@ -92,8 +92,8 @@ private void syncNodeLost(MockNM nm) throws Exception {
dispatcher.await(); dispatcher.await();
} }
private AllocateResponse allocate(final AllocateRequest req) throws Exception { private AllocateResponse allocate(final ApplicationAttemptId attemptId,
ApplicationAttemptId attemptId = req.getApplicationAttemptId(); final AllocateRequest req) throws Exception {
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString()); UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token = Token<AMRMTokenIdentifier> token =
@ -128,18 +128,20 @@ public void testAMRMUnusableNodes() throws Exception {
am1.registerAppAttempt(); am1.registerAppAttempt();
// allocate request returns no updated node // allocate request returns no updated node
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1 AllocateRequest allocateRequest1 =
.getAppAttemptId(), 0, 0F, null, null, null); AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response1 = allocate(allocateRequest1); AllocateResponse response1 =
allocate(attempt1.getAppAttemptId(), allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes(); List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size()); Assert.assertEquals(0, updatedNodes.size());
syncNodeHeartbeat(nm4, false); syncNodeHeartbeat(nm4, false);
// allocate request returns updated node // allocate request returns updated node
allocateRequest1 = AllocateRequest.newInstance(attempt1 allocateRequest1 =
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
response1 = allocate(allocateRequest1); null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes(); updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size()); Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next(); NodeReport nr = updatedNodes.iterator().next();
@ -147,7 +149,7 @@ public void testAMRMUnusableNodes() throws Exception {
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
// resending the allocate request returns the same result // resending the allocate request returns the same result
response1 = allocate(allocateRequest1); response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes(); updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size()); Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next(); nr = updatedNodes.iterator().next();
@ -157,9 +159,10 @@ public void testAMRMUnusableNodes() throws Exception {
syncNodeLost(nm3); syncNodeLost(nm3);
// subsequent allocate request returns delta // subsequent allocate request returns delta
allocateRequest1 = AllocateRequest.newInstance(attempt1 allocateRequest1 =
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
response1 = allocate(allocateRequest1); null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes(); updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size()); Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next(); nr = updatedNodes.iterator().next();
@ -177,27 +180,30 @@ public void testAMRMUnusableNodes() throws Exception {
am2.registerAppAttempt(); am2.registerAppAttempt();
// allocate request returns no updated node // allocate request returns no updated node
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2 AllocateRequest allocateRequest2 =
.getAppAttemptId(), 0, 0F, null, null, null); AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response2 = allocate(allocateRequest2); AllocateResponse response2 =
allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes(); updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size()); Assert.assertEquals(0, updatedNodes.size());
syncNodeHeartbeat(nm4, true); syncNodeHeartbeat(nm4, true);
// both AM's should get delta updated nodes // both AM's should get delta updated nodes
allocateRequest1 = AllocateRequest.newInstance(attempt1 allocateRequest1 =
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
response1 = allocate(allocateRequest1); null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes(); updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size()); Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next(); nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
allocateRequest2 = AllocateRequest.newInstance(attempt2 allocateRequest2 =
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
response2 = allocate(allocateRequest2); null);
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes(); updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size()); Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next(); nr = updatedNodes.iterator().next();
@ -205,9 +211,10 @@ public void testAMRMUnusableNodes() throws Exception {
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
// subsequent allocate calls should return no updated nodes // subsequent allocate calls should return no updated nodes
allocateRequest2 = AllocateRequest.newInstance(attempt2 allocateRequest2 =
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
response2 = allocate(allocateRequest2); null);
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes(); updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size()); Assert.assertEquals(0, updatedNodes.size());

View File

@ -57,8 +57,8 @@ public void tearDown() {
} }
} }
private AllocateResponse allocate(final AllocateRequest req) throws Exception { private AllocateResponse allocate(ApplicationAttemptId attemptId,
ApplicationAttemptId attemptId = req.getApplicationAttemptId(); final AllocateRequest req) throws Exception {
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString()); UserGroupInformation.createRemoteUser(attemptId.toString());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token = org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
@ -88,25 +88,26 @@ public void testARRMResponseId() throws Exception {
am.registerAppAttempt(); am.registerAppAttempt();
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt AllocateRequest allocateRequest =
.getAppAttemptId(), 0, 0F, null, null, null); AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response = allocate(allocateRequest); AllocateResponse response =
allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.assertEquals(1, response.getResponseId()); Assert.assertEquals(1, response.getResponseId());
Assert.assertTrue(response.getAMCommand() == null); Assert.assertTrue(response.getAMCommand() == null);
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest =
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null); AllocateRequest.newInstance(response.getResponseId(), 0F, null, null,
null);
response = allocate(allocateRequest); response = allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.assertEquals(2, response.getResponseId()); Assert.assertEquals(2, response.getResponseId());
/* try resending */ /* try resending */
response = allocate(allocateRequest); response = allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.assertEquals(2, response.getResponseId()); Assert.assertEquals(2, response.getResponseId());
/** try sending old request again **/ /** try sending old request again **/
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null);
.getAppAttemptId(), 0, 0F, null, null, null); response = allocate(attempt.getAppAttemptId(), allocateRequest);
response = allocate(allocateRequest);
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
} }
} }

View File

@ -310,7 +310,6 @@ public ApplicationMasterProtocol run() {
RegisterApplicationMasterRequest request = Records RegisterApplicationMasterRequest request = Records
.newRecord(RegisterApplicationMasterRequest.class); .newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
client.registerApplicationMaster(request); client.registerApplicationMaster(request);
ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest blacklistRequest =
@ -318,8 +317,7 @@ public ApplicationMasterProtocol run() {
Collections.singletonList(ResourceRequest.ANY), null); Collections.singletonList(ResourceRequest.ANY), null);
AllocateRequest allocateRequest = AllocateRequest allocateRequest =
AllocateRequest.newInstance(applicationAttemptId, 0, 0.0f, null, null, AllocateRequest.newInstance(0, 0.0f, null, null, blacklistRequest);
blacklistRequest);
boolean error = false; boolean error = false;
try { try {
client.allocate(allocateRequest); client.allocate(allocateRequest);

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -118,12 +117,10 @@ public void testTokenExpiry() throws Exception {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
rmClient.registerApplicationMaster(request); rmClient.registerApplicationMaster(request);
FinishApplicationMasterRequest finishAMRequest = FinishApplicationMasterRequest finishAMRequest =
Records.newRecord(FinishApplicationMasterRequest.class); Records.newRecord(FinishApplicationMasterRequest.class);
finishAMRequest.setAppAttemptId(applicationAttemptId);
finishAMRequest finishAMRequest
.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); .setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
finishAMRequest.setDiagnostics("diagnostics"); finishAMRequest.setDiagnostics("diagnostics");
@ -134,11 +131,8 @@ public void testTokenExpiry() throws Exception {
// exception. // exception.
rpc.stopProxy(rmClient, conf); // To avoid using cached client rpc.stopProxy(rmClient, conf); // To avoid using cached client
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);
request.setApplicationAttemptId(BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(12345, 78), 987));
AllocateRequest allocateRequest = AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class); Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
try { try {
rmClient.allocate(allocateRequest); rmClient.allocate(allocateRequest);
Assert.fail("You got to be kidding me! " Assert.fail("You got to be kidding me! "
@ -206,13 +200,11 @@ public void testMasterKeyRollOver() throws Exception {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
rmClient.registerApplicationMaster(request); rmClient.registerApplicationMaster(request);
// One allocate call. // One allocate call.
AllocateRequest allocateRequest = AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class); Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertTrue( Assert.assertTrue(
rmClient.allocate(allocateRequest).getAMCommand() == null); rmClient.allocate(allocateRequest).getAMCommand() == null);
@ -229,7 +221,6 @@ public void testMasterKeyRollOver() throws Exception {
rpc.stopProxy(rmClient, conf); // To avoid using cached client rpc.stopProxy(rmClient, conf); // To avoid using cached client
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertTrue( Assert.assertTrue(
rmClient.allocate(allocateRequest).getAMCommand() == null); rmClient.allocate(allocateRequest).getAMCommand() == null);
} finally { } finally {