merge -c 1459040 from trunk to branch-2 to fix YARN-396. Rationalize AllocateResponse in RM Scheduler API. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1459044 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-03-20 20:46:22 +00:00
parent 05fabc2013
commit 1de797794e
27 changed files with 518 additions and 689 deletions

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -100,10 +99,9 @@ public class LocalContainerAllocator extends RMCommunicator
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
AMResponse response;
AllocateResponse allocateResponse;
try {
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
response = allocateResponse.getAMResponse();
allocateResponse = scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (Exception e) {
@ -120,7 +118,7 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM.
throw e;
}
if (response.getReboot()) {
if (allocateResponse.getReboot()) {
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -544,8 +544,9 @@ public class RMContainerAllocator extends RMContainerRequestor
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
AMResponse response;
int headRoom = getAvailableResources() != null
? getAvailableResources().getMemory() : 0;//first time it would be null
AllocateResponse response;
/*
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
* milliseconds before aborting. During this interval, AM will still try
@ -634,7 +635,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
@SuppressWarnings("unchecked")
private void handleUpdatedNodes(AMResponse response) {
private void handleUpdatedNodes(AllocateResponse response) {
// send event to the job about on updated nodes
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -146,30 +145,30 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
protected AllocateResponse makeRemoteRequest() throws YarnRemoteException {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getAllocatedContainers().size() + " finishedContainers="
+ response.getCompletedContainersStatuses().size()
+ allocateResponse.getAllocatedContainers().size()
+ " finishedContainers="
+ allocateResponse.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
ask.clear();
release.clear();
return response;
return allocateResponse;
}
// May be incorrect if there's multiple NodeManagers running on a single host.

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -248,10 +247,8 @@ public class MRAppBenchmark {
}
}
AMResponse amResponse = Records.newRecord(AMResponse.class);
amResponse.setAllocatedContainers(containers);
amResponse.setResponseId(request.getResponseId() + 1);
response.setAMResponse(amResponse);
response.setAllocatedContainers(containers);
response.setResponseId(request.getResponseId() + 1);
response.setNumClusterNodes(350);
return response;
}

View File

@ -4,6 +4,9 @@ Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen
via hitesh)
NEW FEATURES
IMPROVEMENTS

View File

@ -18,19 +18,23 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* <p>The response sent by the <code>ResourceManager</code> the
* <code>ApplicationMaster</code> during resource negotiation.</p>
*
* <p>The response, via {@link AMResponse}, includes:
* <p>The response, includes:
* <ul>
* <li>Response ID to track duplicate responses.</li>
* <li>
@ -42,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.Container;
* The available headroom for resources in the cluster for the
* application.
* </li>
* <li>A list of nodes whose status has been updated.</li>
* <li>The number of available nodes in a cluster.</li>
* </ul>
* </p>
*
@ -51,18 +57,90 @@ import org.apache.hadoop.yarn.api.records.Container;
@Stable
public interface AllocateResponse {
/**
* Get the {@link AMResponse} sent by the <code>ResourceManager</code>.
* @return <code>AMResponse</code> sent by the <code>ResourceManager</code>
* Should the <code>ApplicationMaster</code> reboot for being horribly
* out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}?
*
* @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise
*/
@Public
@Stable
public abstract AMResponse getAMResponse();
public boolean getReboot();
@Private
@Unstable
public abstract void setAMResponse(AMResponse amResponse);
public void setReboot(boolean reboot);
/**
* Get the <em>last response id</em>.
* @return <em>last response id</em>
*/
@Public
@Stable
public int getResponseId();
@Private
@Unstable
public void setResponseId(int responseId);
/**
* Get the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @return list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public List<Container> getAllocatedContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public void setAllocatedContainers(List<Container> containers);
/**
* Get the <em>available headroom</em> for resources in the cluster for the
* application.
* @return limit of available headroom for resources in the cluster for the
* application
*/
@Public
@Stable
public Resource getAvailableResources();
@Private
@Unstable
public void setAvailableResources(Resource limit);
/**
* Get the list of <em>completed containers' statuses</em>.
* @return the list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public List<ContainerStatus> getCompletedContainersStatuses();
@Private
@Unstable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
/**
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could
* be changes in health, availability etc of the nodes.
* @return The delta of updated nodes since the last response
*/
@Public
@Unstable
public List<NodeReport> getUpdatedNodes();
@Private
@Unstable
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
/**
* Get the number of hosts available on the cluster.
* @return the available host count.

View File

@ -19,11 +19,24 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
@ -35,7 +48,12 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
AllocateResponseProto.Builder builder = null;
boolean viaProto = false;
private AMResponse amResponse;
Resource limit;
private List<Container> allocatedContainers = null;
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
public AllocateResponsePBImpl() {
@ -47,20 +65,38 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
viaProto = true;
}
public AllocateResponseProto getProto() {
public synchronized AllocateResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.amResponse != null) {
builder.setAMResponse(convertToProtoFormat(this.amResponse));
private synchronized void mergeLocalToBuilder() {
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable<ContainerStatusProto> iterable =
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
getNodeReportProtoIterable(this.updatedNodes);
builder.addAllUpdatedNodes(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
}
private void mergeLocalToProto() {
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@ -68,53 +104,293 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
viaProto = true;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AllocateResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public AMResponse getAMResponse() {
public synchronized boolean getReboot() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.amResponse != null) {
return this.amResponse;
}
if (!p.hasAMResponse()) {
return null;
}
this.amResponse= convertFromProtoFormat(p.getAMResponse());
return this.amResponse;
return (p.getReboot());
}
@Override
public void setAMResponse(AMResponse aMResponse) {
public synchronized void setReboot(boolean reboot) {
maybeInitBuilder();
if (aMResponse == null)
builder.clearAMResponse();
this.amResponse = aMResponse;
builder.setReboot((reboot));
}
@Override
public int getNumClusterNodes() {
public synchronized int getResponseId() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasLimit()) {
return null;
}
this.limit = convertFromProtoFormat(p.getLimit());
return this.limit;
}
@Override
public synchronized void setAvailableResources(Resource limit) {
maybeInitBuilder();
if (limit == null)
builder.clearLimit();
this.limit = limit;
}
@Override
public synchronized List<NodeReport> getUpdatedNodes() {
initLocalNewNodeReportList();
return this.updatedNodes;
}
@Override
public synchronized void setUpdatedNodes(
final List<NodeReport> updatedNodes) {
if (updatedNodes == null) {
this.updatedNodes.clear();
return;
}
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
this.updatedNodes.addAll(updatedNodes);
}
@Override
public synchronized List<Container> getAllocatedContainers() {
initLocalNewContainerList();
return this.allocatedContainers;
}
@Override
public synchronized void setAllocatedContainers(
final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.addAll(containers);
}
//// Finished containers
@Override
public synchronized List<ContainerStatus> getCompletedContainersStatuses() {
initLocalFinishedContainerList();
return this.completedContainersStatuses;
}
@Override
public synchronized void setCompletedContainersStatuses(
final List<ContainerStatus> containers) {
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.addAll(containers);
}
@Override
public synchronized int getNumClusterNodes() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNumClusterNodes();
}
@Override
public void setNumClusterNodes(int numNodes) {
public synchronized void setNumClusterNodes(int numNodes) {
maybeInitBuilder();
builder.setNumClusterNodes(numNodes);
}
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
return new AMResponsePBImpl(p);
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
if (this.updatedNodes != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeReportProto> list = p.getUpdatedNodesList();
updatedNodes = new ArrayList<NodeReport>(list.size());
for (NodeReportProto n : list) {
updatedNodes.add(convertFromProtoFormat(n));
}
}
private AMResponseProto convertToProtoFormat(AMResponse t) {
return ((AMResponsePBImpl)t).getProto();
// Once this is called. containerList will never be null - until a getProto
// is called.
private synchronized void initLocalNewContainerList() {
if (this.allocatedContainers != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
allocatedContainers = new ArrayList<Container>();
for (ContainerProto c : list) {
allocatedContainers.add(convertFromProtoFormat(c));
}
}
private synchronized Iterable<ContainerProto> getProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerStatusProto>
getContainerStatusProtoIterable(
final List<ContainerStatus> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerStatusProto>() {
@Override
public synchronized Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
final List<NodeReport> newNodeReportsList) {
maybeInitBuilder();
return new Iterable<NodeReportProto>() {
@Override
public synchronized Iterator<NodeReportProto> iterator() {
return new Iterator<NodeReportProto>() {
Iterator<NodeReport> iter = newNodeReportsList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized NodeReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
// Once this is called. containerList will never be null - until a getProto
// is called.
private synchronized void initLocalFinishedContainerList() {
if (this.completedContainersStatuses != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getCompletedContainerStatusesList();
completedContainersStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
return new NodeReportPBImpl(p);
}
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
}
private synchronized ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl)t).getProto();
}
private synchronized ContainerStatusPBImpl convertFromProtoFormat(
ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
private synchronized ContainerStatusProto convertToProtoFormat(
ContainerStatus t) {
return ((ContainerStatusPBImpl)t).getProto();
}
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private synchronized ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
}

View File

@ -93,7 +93,8 @@ GetAllApplicationsResponse {
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalApplicationsList() {
if (this.applicationList != null) {
return;

View File

@ -92,7 +92,8 @@ public class GetClusterNodesResponsePBImpl extends
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalNodeManagerInfosList() {
if (this.nodeManagerInfoList != null) {
return;

View File

@ -94,7 +94,8 @@ implements GetQueueUserAclsInfoResponse {
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalQueueUserAclsList() {
if (this.queueUserAclsInfoList != null) {
return;

View File

@ -1,138 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
/**
* <p>The response sent by the <code>ResourceManager</code> the
* <code>ApplicationMaster</code> during resource negotiation.</p>
*
* <p>The response includes:
* <ul>
* <li>Response ID to track duplicate responses.</li>
* <li>
* A reboot flag to let the <code>ApplicationMaster</code> know that its
* horribly out of sync and needs to reboot.</li>
* <li>A list of newly allocated {@link Container}.</li>
* <li>A list of completed {@link Container}.</li>
* <li>
* The available headroom for resources in the cluster for the
* application.
* </li>
* </ul>
* </p>
*
* @see AMRMProtocol#allocate(AllocateRequest)
*/
@Public
@Unstable
public interface AMResponse {
/**
* Should the <code>ApplicationMaster</code> reboot for being horribly
* out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}?
*
* @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise
*/
@Public
@Stable
public boolean getReboot();
@Private
@Unstable
public void setReboot(boolean reboot);
/**
* Get the <em>last response id</em>.
* @return <em>last response id</em>
*/
@Public
@Stable
public int getResponseId();
@Private
@Unstable
public void setResponseId(int responseId);
/**
* Get the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @return list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public List<Container> getAllocatedContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public void setAllocatedContainers(List<Container> containers);
/**
* Get the <em>available headroom</em> for resources in the cluster for the
* application.
* @return limit of available headroom for resources in the cluster for the
* application
*/
@Public
@Stable
public Resource getAvailableResources();
@Private
@Unstable
public void setAvailableResources(Resource limit);
/**
* Get the list of <em>completed containers' statuses</em>.
* @return the list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public List<ContainerStatus> getCompletedContainersStatuses();
@Private
@Unstable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
/**
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could be
* changes in health, availability etc of the nodes.
* @return The delta of updated nodes since the last response
*/
@Public
@Unstable
public List<NodeReport> getUpdatedNodes();
@Private
@Unstable
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
}

View File

@ -1,373 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMResponse {
AMResponseProto proto = AMResponseProto.getDefaultInstance();
AMResponseProto.Builder builder = null;
boolean viaProto = false;
Resource limit;
private List<Container> allocatedContainers = null;
private List<ContainerStatus> completedContainersStatuses = null;
// private boolean hasLocalContainerList = false;
private List<NodeReport> updatedNodes = null;
public AMResponsePBImpl() {
builder = AMResponseProto.newBuilder();
}
public AMResponsePBImpl(AMResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized AMResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable<ContainerStatusProto> iterable =
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
getNodeReportProtoIterable(this.updatedNodes);
builder.addAllUpdatedNodes(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized boolean getReboot() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot());
}
@Override
public synchronized void setReboot(boolean reboot) {
maybeInitBuilder();
builder.setReboot((reboot));
}
@Override
public synchronized int getResponseId() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasLimit()) {
return null;
}
this.limit = convertFromProtoFormat(p.getLimit());
return this.limit;
}
@Override
public synchronized void setAvailableResources(Resource limit) {
maybeInitBuilder();
if (limit == null)
builder.clearLimit();
this.limit = limit;
}
@Override
public synchronized List<NodeReport> getUpdatedNodes() {
initLocalNewNodeReportList();
return this.updatedNodes;
}
//Once this is called. updatedNodes will never be null - until a getProto is called.
private synchronized void initLocalNewNodeReportList() {
if (this.updatedNodes != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeReportProto> list = p.getUpdatedNodesList();
updatedNodes = new ArrayList<NodeReport>(list.size());
for (NodeReportProto n : list) {
updatedNodes.add(convertFromProtoFormat(n));
}
}
@Override
public synchronized void setUpdatedNodes(final List<NodeReport> updatedNodes) {
if (updatedNodes == null) {
this.updatedNodes.clear();
return;
}
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
this.updatedNodes.addAll(updatedNodes);
}
@Override
public synchronized List<Container> getAllocatedContainers() {
initLocalNewContainerList();
return this.allocatedContainers;
}
//Once this is called. containerList will never be null - until a getProto is called.
private synchronized void initLocalNewContainerList() {
if (this.allocatedContainers != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
allocatedContainers = new ArrayList<Container>();
for (ContainerProto c : list) {
allocatedContainers.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void setAllocatedContainers(final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.addAll(containers);
}
private synchronized Iterable<ContainerProto> getProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerStatusProto>
getContainerStatusProtoIterable(
final List<ContainerStatus> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerStatusProto>() {
@Override
public synchronized Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
final List<NodeReport> newNodeReportsList) {
maybeInitBuilder();
return new Iterable<NodeReportProto>() {
@Override
public synchronized Iterator<NodeReportProto> iterator() {
return new Iterator<NodeReportProto>() {
Iterator<NodeReport> iter = newNodeReportsList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized NodeReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
//// Finished containers
@Override
public synchronized List<ContainerStatus> getCompletedContainersStatuses() {
initLocalFinishedContainerList();
return this.completedContainersStatuses;
}
//Once this is called. containerList will never be null - untill a getProto is called.
private synchronized void initLocalFinishedContainerList() {
if (this.completedContainersStatuses != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getCompletedContainerStatusesList();
completedContainersStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void setCompletedContainersStatuses(
final List<ContainerStatus> containers) {
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.addAll(containers);
}
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
return new NodeReportPBImpl(p);
}
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
}
private synchronized ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl)t).getProto();
}
private synchronized ContainerStatusPBImpl convertFromProtoFormat(
ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
private synchronized ContainerStatusProto convertToProtoFormat(ContainerStatus t) {
return ((ContainerStatusPBImpl)t).getProto();
}
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private synchronized ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
}

View File

@ -207,16 +207,6 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
}
message AMResponseProto {
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
}
////////////////////////////////////////////////////////////////////////
////// From client_RM_Protocol /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -59,8 +59,13 @@ message AllocateRequestProto {
}
message AllocateResponseProto {
optional AMResponseProto AM_response = 1;
optional int32 num_cluster_nodes = 2;
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -510,10 +509,11 @@ public class ApplicationMaster {
// Send the request to RM
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
AMResponse amResp = sendContainerAskToRM();
AllocateResponse allocResp = sendContainerAskToRM();
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers = amResp.getAllocatedContainers();
List<Container> allocatedContainers =
allocResp.getAllocatedContainers();
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
@ -542,12 +542,12 @@ public class ApplicationMaster {
// Check what the current available resources in the cluster are
// TODO should we do anything if the available resources are not enough?
Resource availableResources = amResp.getAvailableResources();
Resource availableResources = allocResp.getAvailableResources();
LOG.info("Current available resources in the cluster "
+ availableResources);
// Check the completed containers
List<ContainerStatus> completedContainers = amResp
List<ContainerStatus> completedContainers = allocResp
.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
@ -819,14 +819,13 @@ public class ApplicationMaster {
* @return Response from RM to AM with allocated containers
* @throws YarnRemoteException
*/
private AMResponse sendContainerAskToRM() throws YarnRemoteException {
private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
float progressIndicator = (float) numCompletedContainers.get()
/ numTotalContainers;
LOG.info("Sending request to RM for containers" + ", progress="
+ progressIndicator);
AllocateResponse resp = resourceManager.allocate(progressIndicator);
return resp.getAMResponse();
return resourceManager.allocate(progressIndicator);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -194,13 +193,12 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
}
allocateResponse = rmClient.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = response.getResponseId();
clusterAvailableResources = response.getAvailableResources();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -202,9 +201,8 @@ public class TestAMRMClient {
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
AMResponse amResponse = allocResponse.getAMResponse();
allocatedContainerCount += amResponse.getAllocatedContainers().size();
for(Container container : amResponse.getAllocatedContainers()) {
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
@ -264,11 +262,11 @@ public class TestAMRMClient {
while(!releases.isEmpty() || iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
AMResponse amResponse = allocResponse.getAMResponse();
// RM did not send new containers because AM does not need any
assertTrue(amResponse.getAllocatedContainers().size() == 0);
if(amResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
if(allocResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus :allocResponse
.getCompletedContainersStatuses()) {
if(releases.contains(cStatus.getContainerId())) {
assertTrue(cStatus.getState() == ContainerState.COMPLETE);
assertTrue(cStatus.getExitStatus() == -100);

View File

@ -23,9 +23,9 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.junit.Test;
public class TestRecordFactory {
@ -35,15 +35,17 @@ public class TestRecordFactory {
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
try {
AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
AllocateResponse response =
pbRecordFactory.newRecordInstance(AllocateResponse.class);
Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();
Assert.fail("Failed to crete record");
}
try {
AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
AllocateRequest response =
pbRecordFactory.newRecordInstance(AllocateRequest.class);
Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -78,10 +77,12 @@ public class ApplicationMasterService extends AbstractService implements
private YarnScheduler rScheduler;
private InetSocketAddress bindAddress;
private Server server;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
private final AllocateResponse reboot =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
@ -166,7 +167,7 @@ public class ApplicationMasterService extends AbstractService implements
authorizeRequest(applicationAttemptId);
ApplicationId appID = applicationAttemptId.getApplicationId();
AMResponse lastResponse = responseMap.get(applicationAttemptId);
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
@ -214,7 +215,7 @@ public class ApplicationMasterService extends AbstractService implements
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
AMResponse lastResponse = responseMap.get(applicationAttemptId);
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
@ -248,25 +249,20 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
AllocateResponse allocateResponse = recordFactory
.newRecordInstance(AllocateResponse.class);
AMResponse lastResponse = responseMap.get(appAttemptId);
AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
allocateResponse.setAMResponse(lastResponse);
return allocateResponse;
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here.
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
// Allow only one thread in AM to do heartbeat at a time.
@ -288,7 +284,8 @@ public class ApplicationMasterService extends AbstractService implements
appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -311,34 +308,34 @@ public class ApplicationMasterService extends AbstractService implements
updatedNodeReports.add(report);
}
response.setUpdatedNodes(updatedNodeReports);
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
response.setAllocatedContainers(allocation.getContainers());
response.setCompletedContainersStatuses(appAttempt
allocateResponse.setAllocatedContainers(allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
response.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
AMResponse oldResponse = responseMap.put(appAttemptId, response);
AllocateResponse oldResponse =
responseMap.put(appAttemptId, allocateResponse);
if (oldResponse == null) {
// appAttempt got unregistered, remove it back out
responseMap.remove(appAttemptId);
String message = "App Attempt removed from the cache during allocate"
+ appAttemptId;
LOG.error(message);
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
allocateResponse.setAMResponse(response);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
response.setResponseId(0);
LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response);

View File

@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -96,14 +94,14 @@ public class MockAM {
requests.addAll(createReq(hosts, memory, priority, containers));
}
public AMResponse schedule() throws Exception {
AMResponse response = allocate(requests, releases);
public AllocateResponse schedule() throws Exception {
AllocateResponse response = allocate(requests, releases);
requests.clear();
releases.clear();
return response;
}
public AMResponse allocate(
public AllocateResponse allocate(
String host, int memory, int numContainers,
List<ContainerId> releases) throws Exception {
List<ResourceRequest> reqs = createReq(new String[]{host}, memory, 1, numContainers);
@ -143,13 +141,12 @@ public class MockAM {
return req;
}
public AMResponse allocate(
public AllocateResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception {
AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId,
++responseId, 0F, resourceRequest, releases);
AllocateResponse resp = amRMProtocol.allocate(req);
return resp.getAMResponse();
return amRMProtocol.allocate(req);
}
public void unregisterAppAttempt() throws Exception {

View File

@ -26,7 +26,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -99,32 +99,32 @@ public class TestFifoScheduler {
// add request for containers
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
AMResponse am1Response = am1.schedule(); // send the request
AllocateResponse alloc1Response = am1.schedule(); // send the request
// add request for containers
am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
AMResponse am2Response = am2.schedule(); // send the request
AllocateResponse alloc2Response = am2.schedule(); // send the request
// kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
nm1.nodeHeartbeat(true);
while (am1Response.getAllocatedContainers().size() < 1) {
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
am1Response = am1.schedule();
alloc1Response = am1.schedule();
}
while (am2Response.getAllocatedContainers().size() < 1) {
while (alloc2Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(1000);
am2Response = am2.schedule();
alloc2Response = am2.schedule();
}
// kick the scheduler, nothing given remaining 2 GB.
nm2.nodeHeartbeat(true);
List<Container> allocated1 = am1Response.getAllocatedContainers();
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
List<Container> allocated2 = am2Response.getAllocatedContainers();
List<Container> allocated2 = alloc2Response.getAllocatedContainers();
Assert.assertEquals(1, allocated2.size());
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -213,9 +213,10 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService());
AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(amResponse.getReboot());
Assert.assertTrue(allocResponse.getReboot());
// NM should be rebooted on heartbeat, even first heartbeat for nm2
HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -109,7 +109,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response1 = amService.allocate(allocateRequest1).getAMResponse();
AllocateResponse response1 = amService.allocate(allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -118,7 +118,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns updated node
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
@ -126,7 +126,7 @@ public class TestAMRMRPCNodeUpdates {
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
// resending the allocate request returns the same result
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -138,7 +138,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate request returns delta
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -158,7 +158,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response2 = amService.allocate(allocateRequest2).getAMResponse();
AllocateResponse response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -167,7 +167,7 @@ public class TestAMRMRPCNodeUpdates {
// both AM's should get delta updated nodes
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -176,7 +176,7 @@ public class TestAMRMRPCNodeUpdates {
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -186,7 +186,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate calls should return no updated nodes
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@ -81,22 +81,22 @@ public class TestAMRMRPCResponseId {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response = amService.allocate(allocateRequest).getAMResponse();
AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getReboot());
allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/* try resending */
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/** try sending old request again **/
allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), 0, 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getReboot());
}
}

View File

@ -201,8 +201,7 @@ public class TestApplicationTokens {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
.getReboot());
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot());
// Simulate a master-key-roll-over
ApplicationTokenSecretManager appTokenSecretManager =
@ -218,8 +217,7 @@ public class TestApplicationTokens {
rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
.getReboot());
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot());
} finally {
rm.stop();
if (rmClient != null) {

View File

@ -487,7 +487,7 @@ public class TestContainerManagerSecurity {
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
new ArrayList<ContainerId>());
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
.getAllocatedContainers();
// Modify ask to request no more.
allocateRequest.clearAsks();
@ -499,7 +499,7 @@ public class TestContainerManagerSecurity {
Thread.sleep(1000);
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
.getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);

View File

@ -493,7 +493,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
+---+
* The AllocateResponse sent back from the ResourceManager provides the
following information via the AMResponse object:
following information:
* Reboot flag: For scenarios when the ApplicationMaster may get out of sync
with the ResourceManager.
@ -511,7 +511,9 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
allocated container, it will receive an update from the ResourceManager
when the container completes. The ApplicationMaster can look into the
status of the completed container and take appropriate actions such as
re-trying a particular sub-task in case of a failure.
re-trying a particular sub-task in case of a failure.
* Number of cluster nodes: The number of hosts available on the cluster.
[]
@ -525,13 +527,11 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
containers.
+---+
// Get AMResponse from AllocateResponse
AMResponse amResp = allocateResponse.getAMResponse();
// Retrieve list of allocated containers from the response
// and on each allocated container, lets assume we are launching
// the same job.
List<Container> allocatedContainers = amResp.getAllocatedContainers();
List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
@ -553,7 +553,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
}
// Check what the current available resources in the cluster are
Resource availableResources = amResp.getAvailableResources();
Resource availableResources = allocateResponse.getAvailableResources();
// Based on this information, an ApplicationMaster can make appropriate
// decisions
@ -561,7 +561,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
// Let's assume we are keeping a count of total completed containers,
// containers that failed and ones that completed successfully.
List<ContainerStatus> completedContainers =
amResp.getCompletedContainersStatuses();
allocateResponse.getCompletedContainersStatuses();
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID= "
+ containerStatus.getContainerId()
@ -611,7 +611,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
+---+
//Assuming an allocated Container obtained from AMResponse
//Assuming an allocated Container obtained from AllocateResponse
Container container;
// Connect to ContainerManager on the allocated container
String cmIpPortStr = container.getNodeId().getHost() + ":"