YARN-396. Rationalize AllocateResponse in RM Scheduler API. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1459040 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-03-20 20:44:35 +00:00
parent f7df0cb7df
commit 1bd345d6e3
27 changed files with 518 additions and 689 deletions

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -100,10 +99,9 @@ public class LocalContainerAllocator extends RMCommunicator
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
AMResponse response;
AllocateResponse allocateResponse;
try {
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
response = allocateResponse.getAMResponse();
allocateResponse = scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (Exception e) {
@ -120,7 +118,7 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM.
throw e;
}
if (response.getReboot()) {
if (allocateResponse.getReboot()) {
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -544,8 +544,9 @@ public class RMContainerAllocator extends RMContainerRequestor
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
AMResponse response;
int headRoom = getAvailableResources() != null
? getAvailableResources().getMemory() : 0;//first time it would be null
AllocateResponse response;
/*
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
* milliseconds before aborting. During this interval, AM will still try
@ -634,7 +635,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
@SuppressWarnings("unchecked")
private void handleUpdatedNodes(AMResponse response) {
private void handleUpdatedNodes(AllocateResponse response) {
// send event to the job about on updated nodes
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -146,30 +145,30 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
protected AllocateResponse makeRemoteRequest() throws YarnRemoteException {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getAllocatedContainers().size() + " finishedContainers="
+ response.getCompletedContainersStatuses().size()
+ allocateResponse.getAllocatedContainers().size()
+ " finishedContainers="
+ allocateResponse.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
ask.clear();
release.clear();
return response;
return allocateResponse;
}
// May be incorrect if there's multiple NodeManagers running on a single host.

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -248,10 +247,8 @@ public class MRAppBenchmark {
}
}
AMResponse amResponse = Records.newRecord(AMResponse.class);
amResponse.setAllocatedContainers(containers);
amResponse.setResponseId(request.getResponseId() + 1);
response.setAMResponse(amResponse);
response.setAllocatedContainers(containers);
response.setResponseId(request.getResponseId() + 1);
response.setNumClusterNodes(350);
return response;
}

View File

@ -52,6 +52,9 @@ Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen
via hitesh)
NEW FEATURES
IMPROVEMENTS

View File

@ -18,19 +18,23 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* <p>The response sent by the <code>ResourceManager</code> the
* <code>ApplicationMaster</code> during resource negotiation.</p>
*
* <p>The response, via {@link AMResponse}, includes:
* <p>The response, includes:
* <ul>
* <li>Response ID to track duplicate responses.</li>
* <li>
@ -42,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.Container;
* The available headroom for resources in the cluster for the
* application.
* </li>
* <li>A list of nodes whose status has been updated.</li>
* <li>The number of available nodes in a cluster.</li>
* </ul>
* </p>
*
@ -51,17 +57,89 @@ import org.apache.hadoop.yarn.api.records.Container;
@Stable
public interface AllocateResponse {
/**
* Get the {@link AMResponse} sent by the <code>ResourceManager</code>.
* @return <code>AMResponse</code> sent by the <code>ResourceManager</code>
* Should the <code>ApplicationMaster</code> reboot for being horribly
* out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}?
*
* @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise
*/
@Public
@Stable
public abstract AMResponse getAMResponse();
public boolean getReboot();
@Private
@Unstable
public abstract void setAMResponse(AMResponse amResponse);
public void setReboot(boolean reboot);
/**
* Get the <em>last response id</em>.
* @return <em>last response id</em>
*/
@Public
@Stable
public int getResponseId();
@Private
@Unstable
public void setResponseId(int responseId);
/**
* Get the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @return list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public List<Container> getAllocatedContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public void setAllocatedContainers(List<Container> containers);
/**
* Get the <em>available headroom</em> for resources in the cluster for the
* application.
* @return limit of available headroom for resources in the cluster for the
* application
*/
@Public
@Stable
public Resource getAvailableResources();
@Private
@Unstable
public void setAvailableResources(Resource limit);
/**
* Get the list of <em>completed containers' statuses</em>.
* @return the list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public List<ContainerStatus> getCompletedContainersStatuses();
@Private
@Unstable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
/**
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could
* be changes in health, availability etc of the nodes.
* @return The delta of updated nodes since the last response
*/
@Public
@Unstable
public List<NodeReport> getUpdatedNodes();
@Private
@Unstable
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
/**
* Get the number of hosts available on the cluster.

View File

@ -19,11 +19,24 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
@ -35,7 +48,12 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
AllocateResponseProto.Builder builder = null;
boolean viaProto = false;
private AMResponse amResponse;
Resource limit;
private List<Container> allocatedContainers = null;
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
public AllocateResponsePBImpl() {
@ -47,20 +65,38 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
viaProto = true;
}
public AllocateResponseProto getProto() {
public synchronized AllocateResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.amResponse != null) {
builder.setAMResponse(convertToProtoFormat(this.amResponse));
private synchronized void mergeLocalToBuilder() {
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable<ContainerStatusProto> iterable =
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
getNodeReportProtoIterable(this.updatedNodes);
builder.addAllUpdatedNodes(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
}
private void mergeLocalToProto() {
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@ -68,53 +104,293 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
viaProto = true;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AllocateResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized boolean getReboot() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot());
}
@Override
public AMResponse getAMResponse() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.amResponse != null) {
return this.amResponse;
public synchronized void setReboot(boolean reboot) {
maybeInitBuilder();
builder.setReboot((reboot));
}
if (!p.hasAMResponse()) {
@Override
public synchronized int getResponseId() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasLimit()) {
return null;
}
this.amResponse= convertFromProtoFormat(p.getAMResponse());
return this.amResponse;
this.limit = convertFromProtoFormat(p.getLimit());
return this.limit;
}
@Override
public void setAMResponse(AMResponse aMResponse) {
public synchronized void setAvailableResources(Resource limit) {
maybeInitBuilder();
if (aMResponse == null)
builder.clearAMResponse();
this.amResponse = aMResponse;
if (limit == null)
builder.clearLimit();
this.limit = limit;
}
@Override
public int getNumClusterNodes() {
public synchronized List<NodeReport> getUpdatedNodes() {
initLocalNewNodeReportList();
return this.updatedNodes;
}
@Override
public synchronized void setUpdatedNodes(
final List<NodeReport> updatedNodes) {
if (updatedNodes == null) {
this.updatedNodes.clear();
return;
}
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
this.updatedNodes.addAll(updatedNodes);
}
@Override
public synchronized List<Container> getAllocatedContainers() {
initLocalNewContainerList();
return this.allocatedContainers;
}
@Override
public synchronized void setAllocatedContainers(
final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.addAll(containers);
}
//// Finished containers
@Override
public synchronized List<ContainerStatus> getCompletedContainersStatuses() {
initLocalFinishedContainerList();
return this.completedContainersStatuses;
}
@Override
public synchronized void setCompletedContainersStatuses(
final List<ContainerStatus> containers) {
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.addAll(containers);
}
@Override
public synchronized int getNumClusterNodes() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNumClusterNodes();
}
@Override
public void setNumClusterNodes(int numNodes) {
public synchronized void setNumClusterNodes(int numNodes) {
maybeInitBuilder();
builder.setNumClusterNodes(numNodes);
}
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
if (this.updatedNodes != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeReportProto> list = p.getUpdatedNodesList();
updatedNodes = new ArrayList<NodeReport>(list.size());
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
return new AMResponsePBImpl(p);
for (NodeReportProto n : list) {
updatedNodes.add(convertFromProtoFormat(n));
}
}
private AMResponseProto convertToProtoFormat(AMResponse t) {
return ((AMResponsePBImpl)t).getProto();
// Once this is called. containerList will never be null - until a getProto
// is called.
private synchronized void initLocalNewContainerList() {
if (this.allocatedContainers != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
allocatedContainers = new ArrayList<Container>();
for (ContainerProto c : list) {
allocatedContainers.add(convertFromProtoFormat(c));
}
}
private synchronized Iterable<ContainerProto> getProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerStatusProto>
getContainerStatusProtoIterable(
final List<ContainerStatus> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerStatusProto>() {
@Override
public synchronized Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
final List<NodeReport> newNodeReportsList) {
maybeInitBuilder();
return new Iterable<NodeReportProto>() {
@Override
public synchronized Iterator<NodeReportProto> iterator() {
return new Iterator<NodeReportProto>() {
Iterator<NodeReport> iter = newNodeReportsList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized NodeReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
// Once this is called. containerList will never be null - until a getProto
// is called.
private synchronized void initLocalFinishedContainerList() {
if (this.completedContainersStatuses != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getCompletedContainerStatusesList();
completedContainersStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
return new NodeReportPBImpl(p);
}
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
}
private synchronized ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl)t).getProto();
}
private synchronized ContainerStatusPBImpl convertFromProtoFormat(
ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
private synchronized ContainerStatusProto convertToProtoFormat(
ContainerStatus t) {
return ((ContainerStatusPBImpl)t).getProto();
}
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private synchronized ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
}

View File

@ -93,7 +93,8 @@ GetAllApplicationsResponse {
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalApplicationsList() {
if (this.applicationList != null) {
return;

View File

@ -92,7 +92,8 @@ public class GetClusterNodesResponsePBImpl extends
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalNodeManagerInfosList() {
if (this.nodeManagerInfoList != null) {
return;

View File

@ -94,7 +94,8 @@ implements GetQueueUserAclsInfoResponse {
viaProto = false;
}
//Once this is called. containerList will never be null - untill a getProto is called.
// Once this is called. containerList will never be null - until a getProto
// is called.
private void initLocalQueueUserAclsList() {
if (this.queueUserAclsInfoList != null) {
return;

View File

@ -1,138 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
/**
* <p>The response sent by the <code>ResourceManager</code> the
* <code>ApplicationMaster</code> during resource negotiation.</p>
*
* <p>The response includes:
* <ul>
* <li>Response ID to track duplicate responses.</li>
* <li>
* A reboot flag to let the <code>ApplicationMaster</code> know that its
* horribly out of sync and needs to reboot.</li>
* <li>A list of newly allocated {@link Container}.</li>
* <li>A list of completed {@link Container}.</li>
* <li>
* The available headroom for resources in the cluster for the
* application.
* </li>
* </ul>
* </p>
*
* @see AMRMProtocol#allocate(AllocateRequest)
*/
@Public
@Unstable
public interface AMResponse {
/**
* Should the <code>ApplicationMaster</code> reboot for being horribly
* out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}?
*
* @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise
*/
@Public
@Stable
public boolean getReboot();
@Private
@Unstable
public void setReboot(boolean reboot);
/**
* Get the <em>last response id</em>.
* @return <em>last response id</em>
*/
@Public
@Stable
public int getResponseId();
@Private
@Unstable
public void setResponseId(int responseId);
/**
* Get the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @return list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public List<Container> getAllocatedContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public void setAllocatedContainers(List<Container> containers);
/**
* Get the <em>available headroom</em> for resources in the cluster for the
* application.
* @return limit of available headroom for resources in the cluster for the
* application
*/
@Public
@Stable
public Resource getAvailableResources();
@Private
@Unstable
public void setAvailableResources(Resource limit);
/**
* Get the list of <em>completed containers' statuses</em>.
* @return the list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public List<ContainerStatus> getCompletedContainersStatuses();
@Private
@Unstable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
/**
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could be
* changes in health, availability etc of the nodes.
* @return The delta of updated nodes since the last response
*/
@Public
@Unstable
public List<NodeReport> getUpdatedNodes();
@Private
@Unstable
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
}

View File

@ -1,373 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMResponse {
AMResponseProto proto = AMResponseProto.getDefaultInstance();
AMResponseProto.Builder builder = null;
boolean viaProto = false;
Resource limit;
private List<Container> allocatedContainers = null;
private List<ContainerStatus> completedContainersStatuses = null;
// private boolean hasLocalContainerList = false;
private List<NodeReport> updatedNodes = null;
public AMResponsePBImpl() {
builder = AMResponseProto.newBuilder();
}
public AMResponsePBImpl(AMResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized AMResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable<ContainerStatusProto> iterable =
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
getNodeReportProtoIterable(this.updatedNodes);
builder.addAllUpdatedNodes(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized boolean getReboot() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot());
}
@Override
public synchronized void setReboot(boolean reboot) {
maybeInitBuilder();
builder.setReboot((reboot));
}
@Override
public synchronized int getResponseId() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasLimit()) {
return null;
}
this.limit = convertFromProtoFormat(p.getLimit());
return this.limit;
}
@Override
public synchronized void setAvailableResources(Resource limit) {
maybeInitBuilder();
if (limit == null)
builder.clearLimit();
this.limit = limit;
}
@Override
public synchronized List<NodeReport> getUpdatedNodes() {
initLocalNewNodeReportList();
return this.updatedNodes;
}
//Once this is called. updatedNodes will never be null - until a getProto is called.
private synchronized void initLocalNewNodeReportList() {
if (this.updatedNodes != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeReportProto> list = p.getUpdatedNodesList();
updatedNodes = new ArrayList<NodeReport>(list.size());
for (NodeReportProto n : list) {
updatedNodes.add(convertFromProtoFormat(n));
}
}
@Override
public synchronized void setUpdatedNodes(final List<NodeReport> updatedNodes) {
if (updatedNodes == null) {
this.updatedNodes.clear();
return;
}
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
this.updatedNodes.addAll(updatedNodes);
}
@Override
public synchronized List<Container> getAllocatedContainers() {
initLocalNewContainerList();
return this.allocatedContainers;
}
//Once this is called. containerList will never be null - until a getProto is called.
private synchronized void initLocalNewContainerList() {
if (this.allocatedContainers != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
allocatedContainers = new ArrayList<Container>();
for (ContainerProto c : list) {
allocatedContainers.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void setAllocatedContainers(final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.addAll(containers);
}
private synchronized Iterable<ContainerProto> getProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerStatusProto>
getContainerStatusProtoIterable(
final List<ContainerStatus> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerStatusProto>() {
@Override
public synchronized Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
final List<NodeReport> newNodeReportsList) {
maybeInitBuilder();
return new Iterable<NodeReportProto>() {
@Override
public synchronized Iterator<NodeReportProto> iterator() {
return new Iterator<NodeReportProto>() {
Iterator<NodeReport> iter = newNodeReportsList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized NodeReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
//// Finished containers
@Override
public synchronized List<ContainerStatus> getCompletedContainersStatuses() {
initLocalFinishedContainerList();
return this.completedContainersStatuses;
}
//Once this is called. containerList will never be null - untill a getProto is called.
private synchronized void initLocalFinishedContainerList() {
if (this.completedContainersStatuses != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getCompletedContainerStatusesList();
completedContainersStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void setCompletedContainersStatuses(
final List<ContainerStatus> containers) {
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.addAll(containers);
}
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
return new NodeReportPBImpl(p);
}
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
}
private synchronized ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl)t).getProto();
}
private synchronized ContainerStatusPBImpl convertFromProtoFormat(
ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
private synchronized ContainerStatusProto convertToProtoFormat(ContainerStatus t) {
return ((ContainerStatusPBImpl)t).getProto();
}
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private synchronized ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
}

View File

@ -207,16 +207,6 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
}
message AMResponseProto {
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
}
////////////////////////////////////////////////////////////////////////
////// From client_RM_Protocol /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -59,8 +59,13 @@ message AllocateRequestProto {
}
message AllocateResponseProto {
optional AMResponseProto AM_response = 1;
optional int32 num_cluster_nodes = 2;
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -510,10 +509,11 @@ public class ApplicationMaster {
// Send the request to RM
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
AMResponse amResp = sendContainerAskToRM();
AllocateResponse allocResp = sendContainerAskToRM();
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers = amResp.getAllocatedContainers();
List<Container> allocatedContainers =
allocResp.getAllocatedContainers();
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
@ -542,12 +542,12 @@ public class ApplicationMaster {
// Check what the current available resources in the cluster are
// TODO should we do anything if the available resources are not enough?
Resource availableResources = amResp.getAvailableResources();
Resource availableResources = allocResp.getAvailableResources();
LOG.info("Current available resources in the cluster "
+ availableResources);
// Check the completed containers
List<ContainerStatus> completedContainers = amResp
List<ContainerStatus> completedContainers = allocResp
.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
@ -819,14 +819,13 @@ public class ApplicationMaster {
* @return Response from RM to AM with allocated containers
* @throws YarnRemoteException
*/
private AMResponse sendContainerAskToRM() throws YarnRemoteException {
private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
float progressIndicator = (float) numCompletedContainers.get()
/ numTotalContainers;
LOG.info("Sending request to RM for containers" + ", progress="
+ progressIndicator);
AllocateResponse resp = resourceManager.allocate(progressIndicator);
return resp.getAMResponse();
return resourceManager.allocate(progressIndicator);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -194,13 +193,12 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
}
allocateResponse = rmClient.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = response.getResponseId();
clusterAvailableResources = response.getAvailableResources();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -202,9 +201,8 @@ public class TestAMRMClient {
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
AMResponse amResponse = allocResponse.getAMResponse();
allocatedContainerCount += amResponse.getAllocatedContainers().size();
for(Container container : amResponse.getAllocatedContainers()) {
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
@ -264,11 +262,11 @@ public class TestAMRMClient {
while(!releases.isEmpty() || iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
AMResponse amResponse = allocResponse.getAMResponse();
// RM did not send new containers because AM does not need any
assertTrue(amResponse.getAllocatedContainers().size() == 0);
if(amResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
if(allocResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus :allocResponse
.getCompletedContainersStatuses()) {
if(releases.contains(cStatus.getContainerId())) {
assertTrue(cStatus.getState() == ContainerState.COMPLETE);
assertTrue(cStatus.getExitStatus() == -100);

View File

@ -23,9 +23,9 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.junit.Test;
public class TestRecordFactory {
@ -35,15 +35,17 @@ public class TestRecordFactory {
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
try {
AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
AllocateResponse response =
pbRecordFactory.newRecordInstance(AllocateResponse.class);
Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();
Assert.fail("Failed to crete record");
}
try {
AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
AllocateRequest response =
pbRecordFactory.newRecordInstance(AllocateRequest.class);
Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -78,10 +77,12 @@ public class ApplicationMasterService extends AbstractService implements
private YarnScheduler rScheduler;
private InetSocketAddress bindAddress;
private Server server;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
private final AllocateResponse reboot =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
@ -166,7 +167,7 @@ public class ApplicationMasterService extends AbstractService implements
authorizeRequest(applicationAttemptId);
ApplicationId appID = applicationAttemptId.getApplicationId();
AMResponse lastResponse = responseMap.get(applicationAttemptId);
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
@ -214,7 +215,7 @@ public class ApplicationMasterService extends AbstractService implements
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
AMResponse lastResponse = responseMap.get(applicationAttemptId);
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
@ -248,25 +249,20 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
AllocateResponse allocateResponse = recordFactory
.newRecordInstance(AllocateResponse.class);
AMResponse lastResponse = responseMap.get(appAttemptId);
AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
allocateResponse.setAMResponse(lastResponse);
return allocateResponse;
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here.
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
// Allow only one thread in AM to do heartbeat at a time.
@ -288,7 +284,8 @@ public class ApplicationMasterService extends AbstractService implements
appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -311,34 +308,34 @@ public class ApplicationMasterService extends AbstractService implements
updatedNodeReports.add(report);
}
response.setUpdatedNodes(updatedNodeReports);
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
response.setAllocatedContainers(allocation.getContainers());
response.setCompletedContainersStatuses(appAttempt
allocateResponse.setAllocatedContainers(allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
response.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
AMResponse oldResponse = responseMap.put(appAttemptId, response);
AllocateResponse oldResponse =
responseMap.put(appAttemptId, allocateResponse);
if (oldResponse == null) {
// appAttempt got unregistered, remove it back out
responseMap.remove(appAttemptId);
String message = "App Attempt removed from the cache during allocate"
+ appAttemptId;
LOG.error(message);
allocateResponse.setAMResponse(reboot);
return allocateResponse;
return reboot;
}
allocateResponse.setAMResponse(response);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
response.setResponseId(0);
LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response);

View File

@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -96,14 +94,14 @@ public class MockAM {
requests.addAll(createReq(hosts, memory, priority, containers));
}
public AMResponse schedule() throws Exception {
AMResponse response = allocate(requests, releases);
public AllocateResponse schedule() throws Exception {
AllocateResponse response = allocate(requests, releases);
requests.clear();
releases.clear();
return response;
}
public AMResponse allocate(
public AllocateResponse allocate(
String host, int memory, int numContainers,
List<ContainerId> releases) throws Exception {
List<ResourceRequest> reqs = createReq(new String[]{host}, memory, 1, numContainers);
@ -143,13 +141,12 @@ public class MockAM {
return req;
}
public AMResponse allocate(
public AllocateResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception {
AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId,
++responseId, 0F, resourceRequest, releases);
AllocateResponse resp = amRMProtocol.allocate(req);
return resp.getAMResponse();
return amRMProtocol.allocate(req);
}
public void unregisterAppAttempt() throws Exception {

View File

@ -26,7 +26,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -99,32 +99,32 @@ public class TestFifoScheduler {
// add request for containers
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
AMResponse am1Response = am1.schedule(); // send the request
AllocateResponse alloc1Response = am1.schedule(); // send the request
// add request for containers
am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
AMResponse am2Response = am2.schedule(); // send the request
AllocateResponse alloc2Response = am2.schedule(); // send the request
// kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
nm1.nodeHeartbeat(true);
while (am1Response.getAllocatedContainers().size() < 1) {
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
am1Response = am1.schedule();
alloc1Response = am1.schedule();
}
while (am2Response.getAllocatedContainers().size() < 1) {
while (alloc2Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(1000);
am2Response = am2.schedule();
alloc2Response = am2.schedule();
}
// kick the scheduler, nothing given remaining 2 GB.
nm2.nodeHeartbeat(true);
List<Container> allocated1 = am1Response.getAllocatedContainers();
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
List<Container> allocated2 = am2Response.getAllocatedContainers();
List<Container> allocated2 = alloc2Response.getAllocatedContainers();
Assert.assertEquals(1, allocated2.size());
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -213,9 +213,10 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService());
AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(amResponse.getReboot());
Assert.assertTrue(allocResponse.getReboot());
// NM should be rebooted on heartbeat, even first heartbeat for nm2
HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -109,7 +109,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response1 = amService.allocate(allocateRequest1).getAMResponse();
AllocateResponse response1 = amService.allocate(allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -118,7 +118,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns updated node
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
@ -126,7 +126,7 @@ public class TestAMRMRPCNodeUpdates {
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
// resending the allocate request returns the same result
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -138,7 +138,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate request returns delta
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -158,7 +158,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response2 = amService.allocate(allocateRequest2).getAMResponse();
AllocateResponse response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -167,7 +167,7 @@ public class TestAMRMRPCNodeUpdates {
// both AM's should get delta updated nodes
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -176,7 +176,7 @@ public class TestAMRMRPCNodeUpdates {
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -186,7 +186,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate calls should return no updated nodes
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@ -81,22 +81,22 @@ public class TestAMRMRPCResponseId {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response = amService.allocate(allocateRequest).getAMResponse();
AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getReboot());
allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/* try resending */
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/** try sending old request again **/
allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), 0, 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse();
response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getReboot());
}
}

View File

@ -201,8 +201,7 @@ public class TestApplicationTokens {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
.getReboot());
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot());
// Simulate a master-key-roll-over
ApplicationTokenSecretManager appTokenSecretManager =
@ -218,8 +217,7 @@ public class TestApplicationTokens {
rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
.getReboot());
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot());
} finally {
rm.stop();
if (rmClient != null) {

View File

@ -487,7 +487,7 @@ public class TestContainerManagerSecurity {
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
new ArrayList<ContainerId>());
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
.getAllocatedContainers();
// Modify ask to request no more.
allocateRequest.clearAsks();
@ -499,7 +499,7 @@ public class TestContainerManagerSecurity {
Thread.sleep(1000);
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
.getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);

View File

@ -493,7 +493,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
+---+
* The AllocateResponse sent back from the ResourceManager provides the
following information via the AMResponse object:
following information:
* Reboot flag: For scenarios when the ApplicationMaster may get out of sync
with the ResourceManager.
@ -513,6 +513,8 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
status of the completed container and take appropriate actions such as
re-trying a particular sub-task in case of a failure.
* Number of cluster nodes: The number of hosts available on the cluster.
[]
One thing to note is that containers will not be immediately allocated to
@ -525,13 +527,11 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
containers.
+---+
// Get AMResponse from AllocateResponse
AMResponse amResp = allocateResponse.getAMResponse();
// Retrieve list of allocated containers from the response
// and on each allocated container, lets assume we are launching
// the same job.
List<Container> allocatedContainers = amResp.getAllocatedContainers();
List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
@ -553,7 +553,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
}
// Check what the current available resources in the cluster are
Resource availableResources = amResp.getAvailableResources();
Resource availableResources = allocateResponse.getAvailableResources();
// Based on this information, an ApplicationMaster can make appropriate
// decisions
@ -561,7 +561,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
// Let's assume we are keeping a count of total completed containers,
// containers that failed and ones that completed successfully.
List<ContainerStatus> completedContainers =
amResp.getCompletedContainersStatuses();
allocateResponse.getCompletedContainersStatuses();
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID= "
+ containerStatus.getContainerId()
@ -611,7 +611,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
+---+
//Assuming an allocated Container obtained from AMResponse
//Assuming an allocated Container obtained from AllocateResponse
Container container;
// Connect to ContainerManager on the allocated container
String cmIpPortStr = container.getNodeId().getHost() + ":"