YARN-750. Allow for black-listing resources in YARN API and Impl in CS (acmurthy via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1490392 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-06-06 18:46:59 +00:00
parent 98f55792ff
commit 2051fd5ee2
37 changed files with 745 additions and 97 deletions

View File

@ -96,7 +96,7 @@ public class LocalContainerAllocator extends RMCommunicator
AllocateRequest allocateRequest = AllocateRequest.newInstance(
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
new ArrayList<ContainerId>(), null);
AllocateResponse allocateResponse;
try {
allocateResponse = scheduler.allocate(allocateRequest);

View File

@ -148,7 +148,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
AllocateRequest allocateRequest = AllocateRequest.newInstance(
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
release));
release), null);
AllocateResponse allocateResponse;
try {
allocateResponse = scheduler.allocate(allocateRequest);

View File

@ -1214,7 +1214,8 @@ public class TestRMContainerAllocator {
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release) {
List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
@ -1223,7 +1224,9 @@ public class TestRMContainerAllocator {
askCopy.add(reqCopy);
}
lastAsk = ask;
return super.allocate(applicationAttemptId, askCopy, release);
return super.allocate(
applicationAttemptId, askCopy, release,
blacklistAdditions, blacklistRemovals);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* <p>The protocol between a live instance of <code>ApplicationMaster</code>
@ -60,6 +61,8 @@ public interface AMRMProtocol {
* @return registration respose
* @throws YarnException
* @throws IOException
* @see RegisterApplicationMasterRequest
* @see RegisterApplicationMasterResponse
*/
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
@ -80,6 +83,8 @@ public interface AMRMProtocol {
* @return completion response
* @throws YarnException
* @throws IOException
* @see FinishApplicationMasterRequest
* @see FinishApplicationMasterResponse
*/
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
@ -91,12 +96,16 @@ public interface AMRMProtocol {
*
* <p>The <code>ApplicationMaster</code> uses this interface to provide a list
* of {@link ResourceRequest} and returns unused {@link Container} allocated
* to it via {@link AllocateRequest}.</p>
* to it via {@link AllocateRequest}. Optionally, the
* <code>ApplicationMaster</code> can also <em>blacklist</em> resources
* which it doesn't want to use.</p>
*
* <p>This also doubles up as a <em>heartbeat</em> to let the
* <code>ResourceManager</code> know that the <code>ApplicationMaster</code>
* is alive. Thus, applications should periodically make this call to be kept
* alive. The frequency depends on ??</p>
* alive. The frequency depends on
* {@link YarnConfiguration#RM_AM_EXPIRY_INTERVAL_MS} which defaults to
* {@link YarnConfiguration#DEFAULT_RM_AM_EXPIRY_INTERVAL_MS}.</p>
*
* <p>The <code>ResourceManager</code> responds with list of allocated
* {@link Container}, status of completed containers and headroom information
@ -110,6 +119,8 @@ public interface AMRMProtocol {
* @return allocation response
* @throws YarnException
* @throws IOException
* @see AllocateRequest
* @see AllocateResponse
*/
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -61,13 +62,15 @@ public abstract class AllocateRequest {
public static AllocateRequest newInstance(
ApplicationAttemptId applicationAttemptId, int responseID,
float appProgress, List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased) {
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased);
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
return allocateRequest;
}
@ -127,6 +130,7 @@ public abstract class AllocateRequest {
* Get the list of <code>ResourceRequest</code> to update the
* <code>ResourceManager</code> about the application's resource requirements.
* @return the list of <code>ResourceRequest</code>
* @see ResourceRequest
*/
@Public
@Stable
@ -138,6 +142,7 @@ public abstract class AllocateRequest {
* @param resourceRequests list of <code>ResourceRequest</code> to update the
* <code>ResourceManager</code> about the application's
* resource requirements
* @see ResourceRequest
*/
@Public
@Stable
@ -157,10 +162,37 @@ public abstract class AllocateRequest {
* Set the list of <code>ContainerId</code> of containers being
* released by the <code>ApplicationMaster</code>
* @param releaseContainers list of <code>ContainerId</code> of
* containers being released by the <
* code>ApplicationMaster</code>
* containers being released by the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract void setReleaseList(List<ContainerId> releaseContainers);
/**
* Get the <code>ResourceBlacklistRequest</code> being sent by the
* <code>ApplicationMaster</code>.
* @return the <code>ResourceBlacklistRequest</code> being sent by the
* <code>ApplicationMaster</code>
* @see ResourceBlacklistRequest
*/
@Public
@Stable
public abstract ResourceBlacklistRequest getResourceBlacklistRequest();
/**
* Set the <code>ResourceBlacklistRequest</code> to inform the
* <code>ResourceManager</code> about the blacklist additions and removals
* per the <code>ApplicationMaster</code>.
*
* @param resourceBlacklistRequest the <code>ResourceBlacklistRequest</code>
* to inform the <code>ResourceManager</code> about
* the blacklist additions and removals
* per the <code>ApplicationMaster</code>
* @see ResourceBlacklistRequest
*/
@Public
@Stable
public abstract void setResourceBlacklistRequest(
ResourceBlacklistRequest resourceBlacklistRequest);
}

View File

@ -25,12 +25,15 @@ import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
@ -46,6 +49,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private ApplicationAttemptId applicationAttemptID = null;
private List<ResourceRequest> ask = null;
private List<ContainerId> release = null;
private ResourceBlacklistRequest blacklistRequest = null;
public AllocateRequestPBImpl() {
@ -94,6 +98,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
if (this.release != null) {
addReleasesToProto();
}
if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
}
}
private void mergeLocalToProto() {
@ -161,6 +168,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
initAsks();
return this.ask;
}
@Override
public void setAskList(final List<ResourceRequest> resourceRequests) {
if(resourceRequests == null) {
@ -171,6 +179,28 @@ public class AllocateRequestPBImpl extends AllocateRequest {
this.ask.addAll(resourceRequests);
}
@Override
public ResourceBlacklistRequest getResourceBlacklistRequest() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.blacklistRequest != null) {
return this.blacklistRequest;
}
if (!p.hasBlacklistRequest()) {
return null;
}
this.blacklistRequest = convertFromProtoFormat(p.getBlacklistRequest());
return this.blacklistRequest;
}
@Override
public void setResourceBlacklistRequest(ResourceBlacklistRequest blacklistRequest) {
maybeInitBuilder();
if (blacklistRequest == null) {
builder.clearBlacklistRequest();
}
this.blacklistRequest = blacklistRequest;
}
private void initAsks() {
if (this.ask != null) {
return;
@ -302,4 +332,14 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
private ResourceBlacklistRequestPBImpl convertFromProtoFormat(ResourceBlacklistRequestProto p) {
return new ResourceBlacklistRequestPBImpl(p);
}
private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) {
return ((ResourceBlacklistRequestPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
* {@link ResourceBlacklistRequest} encapsulates the list of resource-names
* which should be added or removed from the <em>blacklist</em> of resources
* for the application.
*
* @see ResourceRequest
* @see AMRMProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
*/
@Public
@Stable
public abstract class ResourceBlacklistRequest {
public static ResourceBlacklistRequest newInstance(
List<String> additions, List<String> removals) {
ResourceBlacklistRequest blacklistRequest =
Records.newRecord(ResourceBlacklistRequest.class);
blacklistRequest.setBlacklistAdditions(additions);
blacklistRequest.setBlacklistRemovals(removals);
return blacklistRequest;
}
/**
* Get the list of resources which should be added to the
* application blacklist.
*
* @return list of resources which should be added to the
* application blacklist
*/
public abstract List<String> getBlacklistAdditions();
/**
* Set list of resources which should be added to the application blacklist.
*
* @param resourceNames list of resources which should be added to the
* application blacklist
*/
public abstract void setBlacklistAdditions(List<String> resourceNames);
/**
* Get the list of resources which should be removed from the
* application blacklist.
*
* @return list of resources which should be removed from the
* application blacklist
*/
public abstract List<String> getBlacklistRemovals();
/**
* Set list of resources which should be removed from the
* application blacklist.
*
* @param resourceNames list of resources which should be removed from the
* application blacklist
*/
public abstract void setBlacklistRemovals(List<String> resourceNames);
}

View File

@ -40,9 +40,15 @@ import org.apache.hadoop.yarn.util.Records;
* </li>
* <li>{@link Resource} required for each request.</li>
* <li>
* Number of containers of such specifications which are required
* Number of containers, of above specifications, which are required
* by the application.
* </li>
* <li>
* A boolean <em>relaxLocality</em> flag, defaulting to <code>true</code>,
* which tells the <code>ResourceManager</code> if the application wants
* locality to be loose (i.e. allows fall-through to rack or <em>any</em>)
* or strict (i.e. specify hard constraint on resource allocation).
* </li>
* </ul>
* </p>
*

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProtoOrBuilder;
public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
ResourceBlacklistRequestProto proto = null;
ResourceBlacklistRequestProto.Builder builder = null;
boolean viaProto = false;
List<String> blacklistAdditions = null;
List<String> blacklistRemovals = null;
public ResourceBlacklistRequestPBImpl() {
builder = ResourceBlacklistRequestProto.newBuilder();
}
public ResourceBlacklistRequestPBImpl(ResourceBlacklistRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public ResourceBlacklistRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ResourceBlacklistRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.blacklistAdditions != null) {
addBlacklistAdditionsToProto();
}
if (this.blacklistRemovals != null) {
addBlacklistRemovalsToProto();
}
}
private void addBlacklistAdditionsToProto() {
maybeInitBuilder();
builder.clearBlacklistAdditions();
if (this.blacklistAdditions == null) {
return;
}
builder.addAllBlacklistAdditions(this.blacklistAdditions);
}
private void addBlacklistRemovalsToProto() {
maybeInitBuilder();
builder.clearBlacklistAdditions();
if (this.blacklistRemovals == null) {
return;
}
builder.addAllBlacklistRemovals(this.blacklistRemovals);
}
private void initBlacklistAdditions() {
if (this.blacklistAdditions != null) {
return;
}
ResourceBlacklistRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> list = p.getBlacklistAdditionsList();
this.blacklistAdditions = new ArrayList<String>();
this.blacklistAdditions.addAll(list);
}
private void initBlacklistRemovals() {
if (this.blacklistRemovals != null) {
return;
}
ResourceBlacklistRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> list = p.getBlacklistRemovalsList();
this.blacklistRemovals = new ArrayList<String>();
this.blacklistRemovals.addAll(list);
}
@Override
public List<String> getBlacklistAdditions() {
initBlacklistAdditions();
return this.blacklistAdditions;
}
@Override
public void setBlacklistAdditions(List<String> resourceNames) {
if (resourceNames == null) {
if (this.blacklistAdditions != null) {
this.blacklistAdditions.clear();
}
return;
}
initBlacklistAdditions();
this.blacklistAdditions.clear();
this.blacklistAdditions.addAll(resourceNames);
}
@Override
public List<String> getBlacklistRemovals() {
initBlacklistRemovals();
return this.blacklistRemovals;
}
@Override
public void setBlacklistRemovals(List<String> resourceNames) {
if (resourceNames == null) {
if (this.blacklistRemovals != null) {
this.blacklistRemovals.clear();
}
return;
}
initBlacklistRemovals();
this.blacklistRemovals.clear();
this.blacklistRemovals.addAll(resourceNames);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -229,6 +229,11 @@ message PreemptionResourceRequestProto {
optional ResourceRequestProto resource = 1;
}
message ResourceBlacklistRequestProto {
repeated string blacklist_additions = 1;
repeated string blacklist_removals = 2;
}
////////////////////////////////////////////////////////////////////////
////// From client_RM_Protocol /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -54,8 +54,9 @@ message AllocateRequestProto {
optional ApplicationAttemptIdProto application_attempt_id = 1;
repeated ResourceRequestProto ask = 2;
repeated ContainerIdProto release = 3;
optional int32 response_id = 4;
optional float progress = 5;
optional ResourceBlacklistRequestProto blacklist_request = 4;
optional int32 response_id = 5;
optional float progress = 6;
}
message AllocateResponseProto {

View File

@ -228,7 +228,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
release.clear();
allocateRequest =
AllocateRequest.newInstance(appAttemptId, lastResponseId,
progressIndicator, askList, releaseList);
progressIndicator, askList, releaseList, null);
}
allocateResponse = rmClient.allocate(allocateRequest);

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -287,17 +289,34 @@ public class ApplicationMasterService extends AbstractService implements
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : null;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : null;
// sanity check
try {
SchedulerUtils.validateResourceRequests(ask,
rScheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw RPCUtil.getRemoteException(e);
throw e;
}
try {
SchedulerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release);
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals);
RMApp app = this.rmContext.getRMApps().get(
appAttemptId.getApplicationId());

View File

@ -803,7 +803,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// AM resource has been checked when submission
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
@ -827,7 +827,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST);
EMPTY_CONTAINER_RELEASE_LIST, null, null);
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -59,6 +60,7 @@ public class AppSchedulingInfo {
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
final Set<String> blacklist = new HashSet<String>();
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
@ -114,12 +116,15 @@ public class AppSchedulingInfo {
* application, by asking for more resources and releasing resources acquired
* by the application.
*
* @param requests
* resources to be acquired
* @param requests resources to be acquired
* @param blacklistAdditions resources to be added to the blacklist
* @param blacklistRemovals resources to be removed from the blacklist
*/
synchronized public void updateResourceRequests(
List<ResourceRequest> requests) {
List<ResourceRequest> requests,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
@ -175,6 +180,20 @@ public class AppSchedulingInfo {
lastRequestContainers)));
}
}
//
// Update blacklist
//
// Add to blacklist
if (blacklistAdditions != null) {
blacklist.addAll(blacklistAdditions);
}
// Remove from blacklist
if (blacklistRemovals != null) {
blacklist.removeAll(blacklistRemovals);
}
}
synchronized public Collection<Priority> getPriorities() {
@ -197,6 +216,10 @@ public class AppSchedulingInfo {
return request.getCapability();
}
public synchronized boolean isBlacklisted(String resourceName) {
return blacklist.contains(resourceName);
}
/**
* Resources have been allocated to this application by the resource
* scheduler. Track them.

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* The exception is thrown when an application provides an invalid
* specification for the blacklist.
*
* As an e.g., currently this exceptions is thrown when an application
* tries to blacklist {@link ResourceRequest#ANY}.
*/
public class InvalidResourceBlacklistRequestException extends YarnException {
private static final long serialVersionUID = 384957911L;
public InvalidResourceBlacklistRequestException(Throwable cause) {
super(cause);
}
public InvalidResourceBlacklistRequestException(String message) {
super(message);
}
public InvalidResourceBlacklistRequestException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* The exception is thrown when the requested resource is out of the range
* The exception is thrown when a {@link ResourceRequest} is out of the range
* of the configured lower and upper resource boundaries.
*
*/
public class InvalidResourceRequestException extends YarnRuntimeException {
public class InvalidResourceRequestException extends YarnException {
private static final long serialVersionUID = 13498237L;
public InvalidResourceRequestException(Throwable cause) {
super(cause);

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -152,4 +153,14 @@ public class SchedulerUtils {
}
}
public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest)
throws InvalidResourceBlacklistRequestException {
if (blacklistRequest != null) {
List<String> plus = blacklistRequest.getBlacklistAdditions();
if (plus != null && plus.contains(ResourceRequest.ANY)) {
throw new InvalidResourceBlacklistRequestException(
"Cannot add " + ResourceRequest.ANY + " to the blacklist!");
}
}
}
}

View File

@ -95,6 +95,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @param appAttemptId
* @param ask
* @param release
* @param blacklistAdditions
* @param blacklistRemovals
* @return the {@link Allocation} for the application
*/
@Public
@ -102,7 +104,9 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
Allocation
allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask,
List<ContainerId> release);
List<ContainerId> release,
List<String> blacklistAdditions,
List<String> blacklistRemovals);
/**
* Get node resource usage report.

View File

@ -472,7 +472,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
@ -523,7 +524,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
application.updateResourceRequests(ask,
blacklistAdditions, blacklistRemovals);
LOG.debug("allocate: post-update");
application.showRequests();

View File

@ -815,6 +815,11 @@ public class LeafQueue implements CSQueue {
}
synchronized (application) {
// Check if this resource is on the blacklist
if (isBlacklisted(application, node)) {
continue;
}
// Schedule in priority order
for (Priority priority : application.getPriorities()) {
// Required resource
@ -898,6 +903,28 @@ public class LeafQueue implements CSQueue {
}
boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) {
if (application.isBlacklisted(node.getHostName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getHostName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
if (application.isBlacklisted(node.getRackName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'rack' " + node.getRackName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
return false;
}
private synchronized CSAssignment
assignReservedContainer(FiCaSchedulerApp application,
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {

View File

@ -134,9 +134,11 @@ public class FiCaSchedulerApp extends SchedulerApplication {
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
List<ResourceRequest> requests,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
this.appSchedulingInfo.updateResourceRequests(requests);
this.appSchedulingInfo.updateResourceRequests(requests,
blacklistAdditions, blacklistRemovals);
}
}
@ -164,6 +166,10 @@ public class FiCaSchedulerApp extends SchedulerApplication {
return this.appSchedulingInfo.getResource(priority);
}
public boolean isBlacklisted(String resourceName) {
return this.appSchedulingInfo.isBlacklisted(resourceName);
}
/**
* Is this application pending?
* @return true if it is else false.

View File

@ -138,7 +138,7 @@ public class FSSchedulerApp extends SchedulerApplication {
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
this.appSchedulingInfo.updateResourceRequests(requests, null, null);
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {

View File

@ -718,7 +718,7 @@ public class FairScheduler implements ResourceScheduler {
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
FSSchedulerApp application = applications.get(appAttemptId);

View File

@ -222,7 +222,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release) {
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
@ -268,7 +268,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
application.updateResourceRequests(ask, null, null);
LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId +

View File

@ -265,7 +265,7 @@ public class Application {
// Get resources from the ResourceManager
resourceManager.getResourceScheduler().allocate(applicationAttemptId,
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>());
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(), null, null);
System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId));

View File

@ -144,7 +144,7 @@ public class MockAM {
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception {
AllocateRequest req = AllocateRequest.newInstance(attemptId,
++responseId, 0F, resourceRequest, releases);
++responseId, 0F, resourceRequest, releases, null);
return amRMProtocol.allocate(req);
}

View File

@ -273,23 +273,23 @@ public class TestFifoScheduler {
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask1, emptyId);
fs.allocate(appAttemptId1, ask1, emptyId, null, null);
// Ask for a 2 GB container for app 2
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
fs.allocate(appAttemptId2, ask2, emptyId);
fs.allocate(appAttemptId2, ask2, emptyId, null, null);
// Trigger container assignment
fs.handle(new NodeUpdateSchedulerEvent(n1));
// Get the allocation for the applications and verify headroom
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("Allocation headroom", 1 * GB,
allocation1.getResourceLimit().getMemory());
Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId);
Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null);
Assert.assertEquals("Allocation headroom", 1 * GB,
allocation2.getResourceLimit().getMemory());

View File

@ -108,7 +108,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), 0, 0F, null, null);
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response1 = amService.allocate(allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -117,7 +117,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns updated node
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
@ -137,7 +137,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate request returns delta
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
@ -157,7 +157,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), 0, 0F, null, null);
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -166,7 +166,7 @@ public class TestAMRMRPCNodeUpdates {
// both AM's should get delta updated nodes
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
@ -175,7 +175,7 @@ public class TestAMRMRPCNodeUpdates {
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
@ -185,7 +185,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate calls should return no updated nodes
allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
response2 = amService.allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());

View File

@ -78,13 +78,13 @@ public class TestAMRMRPCResponseId {
am.registerAppAttempt();
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null);
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getResync());
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null);
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
response = amService.allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
@ -94,7 +94,7 @@ public class TestAMRMRPCResponseId {
/** try sending old request again **/
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null);
.getAppAttemptId(), 0, 0F, null, null, null);
response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getResync());
}

View File

@ -330,7 +330,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState());
verify(scheduler, times(expectedAllocateCount)).
allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class));
any(List.class), any(List.class), any(List.class), any(List.class));
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
@ -345,6 +345,7 @@ public class TestRMAppAttemptTransitions {
/**
* {@link RMAppAttemptState#ALLOCATED}
*/
@SuppressWarnings("unchecked")
private void testAppAttemptAllocatedState(Container amContainer) {
assertEquals(RMAppAttemptState.ALLOCATED,
applicationAttempt.getAppAttemptState());
@ -354,7 +355,9 @@ public class TestRMAppAttemptTransitions {
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
verify(scheduler, times(2)).
allocate(
any(ApplicationAttemptId.class), any(List.class), any(List.class));
any(
ApplicationAttemptId.class), any(List.class), any(List.class),
any(List.class), any(List.class));
}
/**
@ -465,6 +468,7 @@ public class TestRMAppAttemptTransitions {
testAppAttemptScheduledState();
}
@SuppressWarnings("unchecked")
private Container allocateApplicationAttempt() {
scheduleApplicationAttempt();
@ -481,6 +485,8 @@ public class TestRMAppAttemptTransitions {
scheduler.allocate(
any(ApplicationAttemptId.class),
any(List.class),
any(List.class),
any(List.class),
any(List.class))).
thenReturn(allocation);

View File

@ -22,20 +22,47 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
public class TestSchedulerUtils {
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
@Test (timeout = 30000)
public void testNormalizeRequest() {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
@ -240,4 +267,79 @@ public class TestSchedulerUtils {
}
}
@Test
public void testValidateResourceBlacklistRequest() throws Exception {
MyContainerManager containerManager = new MyContainerManager();
final MockRM rm =
new MockRMWithAMS(new YarnConfiguration(), containerManager);
rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>(2);
acls.put(ApplicationAccessType.VIEW_APP, "*");
RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
waitForLaunchedState(attempt);
// Create a client to the RM.
final Configuration conf = rm.getConfig();
final YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
AMRMProtocol client = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
.getApplicationMasterService().getBindAddress(), conf);
}
});
RegisterApplicationMasterRequest request = Records
.newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
client.registerApplicationMaster(request);
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
Collections.singletonList(ResourceRequest.ANY), null);
AllocateRequest allocateRequest =
AllocateRequest.newInstance(applicationAttemptId, 0, 0.0f, null, null,
blacklistRequest);
boolean error = false;
try {
client.allocate(allocateRequest);
} catch (InvalidResourceBlacklistRequestException e) {
error = true;
}
rm.stop();
Assert.assertTrue(
"Didn't not catch InvalidResourceBlacklistRequestException", error);
}
private void waitForLaunchedState(RMAppAttempt attempt)
throws InterruptedException {
int waitCount = 0;
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
&& waitCount++ < 20) {
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
+ "Current state is " + attempt.getAppAttemptState());
Thread.sleep(1000);
}
Assert.assertEquals(attempt.getAppAttemptState(),
RMAppAttemptState.LAUNCHED);
}
}

View File

@ -513,7 +513,7 @@ public class TestApplicationLimits {
app_0_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
app_0_0.updateResourceRequests(app_0_0_requests, null, null);
// Schedule to compute
queue.assignContainers(clusterResource, node_0);
@ -532,7 +532,7 @@ public class TestApplicationLimits {
app_0_1_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
app_0_1.updateResourceRequests(app_0_1_requests, null, null);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
@ -551,7 +551,7 @@ public class TestApplicationLimits {
app_1_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);
app_1_0.updateResourceRequests(app_1_0_requests, null, null);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute

View File

@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -293,7 +294,7 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Start testing...
@ -415,11 +416,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Start testing...
@ -548,11 +549,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
/**
* Start testing...
@ -641,11 +642,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
/**
* Start testing...
@ -680,7 +681,7 @@ public class TestLeafQueue {
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
@ -697,7 +698,7 @@ public class TestLeafQueue {
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
@ -758,11 +759,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
/**
* Start testing...
@ -792,11 +793,11 @@ public class TestLeafQueue {
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
@ -920,11 +921,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Start testing...
@ -1022,7 +1023,7 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
@ -1034,7 +1035,7 @@ public class TestLeafQueue {
true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
true, priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
app_1.updateResourceRequests(appRequests_1, null, null);
// Start testing...
@ -1128,11 +1129,11 @@ public class TestLeafQueue {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)));
priority, recordFactory)), null, null);
// Start testing...
@ -1255,7 +1256,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
// Start testing...
CSAssignment assignment = null;
@ -1320,7 +1321,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
assertEquals(2, app_0.getTotalRequiredResources(priority));
String host_3 = "127.0.0.4"; // on rack_1
@ -1411,7 +1412,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
true, priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
// Start testing...
@ -1526,7 +1527,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
// Start testing...
@ -1535,7 +1536,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0);
@ -1558,7 +1559,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
@ -1759,7 +1760,6 @@ public class TestLeafQueue {
// Setup some nodes and racks
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
String host_0_1 = "127.0.0.2";
FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
@ -1789,6 +1789,7 @@ public class TestLeafQueue {
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// Blacklist: <host_0_0>
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
@ -1803,7 +1804,8 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0,
Collections.singletonList(host_0_0), null);
app_0_requests_0.clear();
//
@ -1830,6 +1832,7 @@ public class TestLeafQueue {
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// Blacklist: <host_0_0>
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
@ -1838,11 +1841,12 @@ public class TestLeafQueue {
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Allow rack-locality for rack_1
// Allow rack-locality for rack_1, but blacklist node_1_1
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0,
Collections.singletonList(host_1_1), null);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
@ -1851,15 +1855,69 @@ public class TestLeafQueue {
// rack_0: < null >
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, true > <----
// rack_1: < 1, 1GB, 1, true >
// ANY: < 1, 1GB, 1, false >
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// Blacklist: < host_0_0 , host_1_1 > <----
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
app_0.updateResourceRequests(app_0_requests_0,
Collections.singletonList(rack_1), Collections.singletonList(host_1_1));
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null >
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, true >
// ANY: < 1, 1GB, 1, false >
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// Blacklist: < host_0_0 , rack_1 > <----
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now remove rack_1 from blacklist
app_0.updateResourceRequests(app_0_requests_0,
null, Collections.singletonList(rack_1));
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
// host_0_1: < null >
// rack_0: < null >
// host_1_0: < 1, 1GB, 1, true >
// host_1_1: < null >
// rack_1: < 1, 1GB, 1, true >
// ANY: < 1, 1GB, 1, false >
// Availability:
// host_0_0: 8G
// host_0_1: 8G
// host_1_0: 8G
// host_1_1: 8G
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
@ -1874,7 +1932,7 @@ public class TestLeafQueue {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>

View File

@ -206,7 +206,7 @@ public class TestFairScheduler {
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
return id;
}
@ -221,7 +221,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>());
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
}
// TESTS
@ -528,7 +528,7 @@ public class TestFairScheduler {
ResourceRequest request1 =
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null);
// Second ask, queue2 requests 1 large + (2 * minReqSize)
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
@ -538,14 +538,14 @@ public class TestFairScheduler {
false);
ask2.add(request2);
ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), null, null);
// Third ask, queue2 requests 1 large
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 =
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true);
ask3.add(request4);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), null, null);
scheduler.update();
@ -1369,7 +1369,7 @@ public class TestFairScheduler {
// Complete container
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
Arrays.asList(containerId));
Arrays.asList(containerId), null, null);
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
// Schedule at opening
@ -1444,7 +1444,7 @@ public class TestFairScheduler {
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
scheduler.allocate(appId, asks, new ArrayList<ContainerId>(), null, null);
// node 1 checks in
scheduler.update();
@ -1799,7 +1799,7 @@ public class TestFairScheduler {
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
createResourceRequest(1024, "rack1", 1, 0, true),
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(attId1, update, new ArrayList<ContainerId>());
scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), null, null);
// then node2 should get the container
scheduler.handle(node2UpdateEvent);
@ -1842,7 +1842,7 @@ public class TestFairScheduler {
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
new ArrayList<ContainerId>());
new ArrayList<ContainerId>(), null, null);
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());

View File

@ -186,7 +186,7 @@ public class TestFifoScheduler {
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>());
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);

View File

@ -501,7 +501,7 @@ public class TestContainerManagerSecurity {
AllocateRequest allocateRequest = AllocateRequest.newInstance(
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
new ArrayList<ContainerId>());
new ArrayList<ContainerId>(), null);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAllocatedContainers();