Merge r1490392 from trunk to branch-2 for YARN-750. Allow for black-listing resources in YARN API and Impl in CS (acmurthy via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1490396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7d1accf8a1
commit
09c12434a2
|
@ -96,7 +96,7 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
AllocateRequest allocateRequest = AllocateRequest.newInstance(
|
||||
this.applicationAttemptId, this.lastResponseID, super
|
||||
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
new ArrayList<ContainerId>(), null);
|
||||
AllocateResponse allocateResponse;
|
||||
try {
|
||||
allocateResponse = scheduler.allocate(allocateRequest);
|
||||
|
|
|
@ -148,7 +148,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
AllocateRequest allocateRequest = AllocateRequest.newInstance(
|
||||
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
|
||||
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
|
||||
release));
|
||||
release), null);
|
||||
AllocateResponse allocateResponse;
|
||||
try {
|
||||
allocateResponse = scheduler.allocate(allocateRequest);
|
||||
|
|
|
@ -1214,7 +1214,8 @@ public class TestRMContainerAllocator {
|
|||
@Override
|
||||
public synchronized Allocation allocate(
|
||||
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
|
||||
List<ContainerId> release) {
|
||||
List<ContainerId> release,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
|
||||
for (ResourceRequest req : ask) {
|
||||
ResourceRequest reqCopy = ResourceRequest.newInstance(req
|
||||
|
@ -1223,7 +1224,9 @@ public class TestRMContainerAllocator {
|
|||
askCopy.add(reqCopy);
|
||||
}
|
||||
lastAsk = ask;
|
||||
return super.allocate(applicationAttemptId, askCopy, release);
|
||||
return super.allocate(
|
||||
applicationAttemptId, askCopy, release,
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* <p>The protocol between a live instance of <code>ApplicationMaster</code>
|
||||
|
@ -60,6 +61,8 @@ public interface AMRMProtocol {
|
|||
* @return registration respose
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @see RegisterApplicationMasterRequest
|
||||
* @see RegisterApplicationMasterResponse
|
||||
*/
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
|
@ -80,6 +83,8 @@ public interface AMRMProtocol {
|
|||
* @return completion response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @see FinishApplicationMasterRequest
|
||||
* @see FinishApplicationMasterResponse
|
||||
*/
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request)
|
||||
|
@ -91,12 +96,16 @@ public interface AMRMProtocol {
|
|||
*
|
||||
* <p>The <code>ApplicationMaster</code> uses this interface to provide a list
|
||||
* of {@link ResourceRequest} and returns unused {@link Container} allocated
|
||||
* to it via {@link AllocateRequest}.</p>
|
||||
* to it via {@link AllocateRequest}. Optionally, the
|
||||
* <code>ApplicationMaster</code> can also <em>blacklist</em> resources
|
||||
* which it doesn't want to use.</p>
|
||||
*
|
||||
* <p>This also doubles up as a <em>heartbeat</em> to let the
|
||||
* <code>ResourceManager</code> know that the <code>ApplicationMaster</code>
|
||||
* is alive. Thus, applications should periodically make this call to be kept
|
||||
* alive. The frequency depends on ??</p>
|
||||
* alive. The frequency depends on
|
||||
* {@link YarnConfiguration#RM_AM_EXPIRY_INTERVAL_MS} which defaults to
|
||||
* {@link YarnConfiguration#DEFAULT_RM_AM_EXPIRY_INTERVAL_MS}.</p>
|
||||
*
|
||||
* <p>The <code>ResourceManager</code> responds with list of allocated
|
||||
* {@link Container}, status of completed containers and headroom information
|
||||
|
@ -110,6 +119,8 @@ public interface AMRMProtocol {
|
|||
* @return allocation response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @see AllocateRequest
|
||||
* @see AllocateResponse
|
||||
*/
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -61,13 +62,15 @@ public abstract class AllocateRequest {
|
|||
public static AllocateRequest newInstance(
|
||||
ApplicationAttemptId applicationAttemptId, int responseID,
|
||||
float appProgress, List<ResourceRequest> resourceAsk,
|
||||
List<ContainerId> containersToBeReleased) {
|
||||
List<ContainerId> containersToBeReleased,
|
||||
ResourceBlacklistRequest resourceBlacklistRequest) {
|
||||
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
allocateRequest.setResponseId(responseID);
|
||||
allocateRequest.setProgress(appProgress);
|
||||
allocateRequest.setAskList(resourceAsk);
|
||||
allocateRequest.setReleaseList(containersToBeReleased);
|
||||
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
|
||||
return allocateRequest;
|
||||
}
|
||||
|
||||
|
@ -127,6 +130,7 @@ public abstract class AllocateRequest {
|
|||
* Get the list of <code>ResourceRequest</code> to update the
|
||||
* <code>ResourceManager</code> about the application's resource requirements.
|
||||
* @return the list of <code>ResourceRequest</code>
|
||||
* @see ResourceRequest
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
|
@ -138,6 +142,7 @@ public abstract class AllocateRequest {
|
|||
* @param resourceRequests list of <code>ResourceRequest</code> to update the
|
||||
* <code>ResourceManager</code> about the application's
|
||||
* resource requirements
|
||||
* @see ResourceRequest
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
|
@ -157,10 +162,37 @@ public abstract class AllocateRequest {
|
|||
* Set the list of <code>ContainerId</code> of containers being
|
||||
* released by the <code>ApplicationMaster</code>
|
||||
* @param releaseContainers list of <code>ContainerId</code> of
|
||||
* containers being released by the <
|
||||
* code>ApplicationMaster</code>
|
||||
* containers being released by the
|
||||
* <code>ApplicationMaster</code>
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setReleaseList(List<ContainerId> releaseContainers);
|
||||
|
||||
/**
|
||||
* Get the <code>ResourceBlacklistRequest</code> being sent by the
|
||||
* <code>ApplicationMaster</code>.
|
||||
* @return the <code>ResourceBlacklistRequest</code> being sent by the
|
||||
* <code>ApplicationMaster</code>
|
||||
* @see ResourceBlacklistRequest
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract ResourceBlacklistRequest getResourceBlacklistRequest();
|
||||
|
||||
/**
|
||||
* Set the <code>ResourceBlacklistRequest</code> to inform the
|
||||
* <code>ResourceManager</code> about the blacklist additions and removals
|
||||
* per the <code>ApplicationMaster</code>.
|
||||
*
|
||||
* @param resourceBlacklistRequest the <code>ResourceBlacklistRequest</code>
|
||||
* to inform the <code>ResourceManager</code> about
|
||||
* the blacklist additions and removals
|
||||
* per the <code>ApplicationMaster</code>
|
||||
* @see ResourceBlacklistRequest
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setResourceBlacklistRequest(
|
||||
ResourceBlacklistRequest resourceBlacklistRequest);
|
||||
}
|
||||
|
|
|
@ -25,12 +25,15 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
|
||||
|
@ -46,6 +49,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
|
|||
private ApplicationAttemptId applicationAttemptID = null;
|
||||
private List<ResourceRequest> ask = null;
|
||||
private List<ContainerId> release = null;
|
||||
private ResourceBlacklistRequest blacklistRequest = null;
|
||||
|
||||
|
||||
public AllocateRequestPBImpl() {
|
||||
|
@ -94,6 +98,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
|
|||
if (this.release != null) {
|
||||
addReleasesToProto();
|
||||
}
|
||||
if (this.blacklistRequest != null) {
|
||||
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -161,6 +168,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
|
|||
initAsks();
|
||||
return this.ask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAskList(final List<ResourceRequest> resourceRequests) {
|
||||
if(resourceRequests == null) {
|
||||
|
@ -171,6 +179,28 @@ public class AllocateRequestPBImpl extends AllocateRequest {
|
|||
this.ask.addAll(resourceRequests);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBlacklistRequest getResourceBlacklistRequest() {
|
||||
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.blacklistRequest != null) {
|
||||
return this.blacklistRequest;
|
||||
}
|
||||
if (!p.hasBlacklistRequest()) {
|
||||
return null;
|
||||
}
|
||||
this.blacklistRequest = convertFromProtoFormat(p.getBlacklistRequest());
|
||||
return this.blacklistRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceBlacklistRequest(ResourceBlacklistRequest blacklistRequest) {
|
||||
maybeInitBuilder();
|
||||
if (blacklistRequest == null) {
|
||||
builder.clearBlacklistRequest();
|
||||
}
|
||||
this.blacklistRequest = blacklistRequest;
|
||||
}
|
||||
|
||||
private void initAsks() {
|
||||
if (this.ask != null) {
|
||||
return;
|
||||
|
@ -302,4 +332,14 @@ public class AllocateRequestPBImpl extends AllocateRequest {
|
|||
private ContainerIdProto convertToProtoFormat(ContainerId t) {
|
||||
return ((ContainerIdPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ResourceBlacklistRequestPBImpl convertFromProtoFormat(ResourceBlacklistRequestProto p) {
|
||||
return new ResourceBlacklistRequestPBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) {
|
||||
return ((ResourceBlacklistRequestPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* {@link ResourceBlacklistRequest} encapsulates the list of resource-names
|
||||
* which should be added or removed from the <em>blacklist</em> of resources
|
||||
* for the application.
|
||||
*
|
||||
* @see ResourceRequest
|
||||
* @see AMRMProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract class ResourceBlacklistRequest {
|
||||
|
||||
public static ResourceBlacklistRequest newInstance(
|
||||
List<String> additions, List<String> removals) {
|
||||
ResourceBlacklistRequest blacklistRequest =
|
||||
Records.newRecord(ResourceBlacklistRequest.class);
|
||||
blacklistRequest.setBlacklistAdditions(additions);
|
||||
blacklistRequest.setBlacklistRemovals(removals);
|
||||
return blacklistRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of resources which should be added to the
|
||||
* application blacklist.
|
||||
*
|
||||
* @return list of resources which should be added to the
|
||||
* application blacklist
|
||||
*/
|
||||
public abstract List<String> getBlacklistAdditions();
|
||||
|
||||
/**
|
||||
* Set list of resources which should be added to the application blacklist.
|
||||
*
|
||||
* @param resourceNames list of resources which should be added to the
|
||||
* application blacklist
|
||||
*/
|
||||
public abstract void setBlacklistAdditions(List<String> resourceNames);
|
||||
|
||||
/**
|
||||
* Get the list of resources which should be removed from the
|
||||
* application blacklist.
|
||||
*
|
||||
* @return list of resources which should be removed from the
|
||||
* application blacklist
|
||||
*/
|
||||
public abstract List<String> getBlacklistRemovals();
|
||||
|
||||
/**
|
||||
* Set list of resources which should be removed from the
|
||||
* application blacklist.
|
||||
*
|
||||
* @param resourceNames list of resources which should be removed from the
|
||||
* application blacklist
|
||||
*/
|
||||
public abstract void setBlacklistRemovals(List<String> resourceNames);
|
||||
|
||||
}
|
|
@ -40,9 +40,15 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
* </li>
|
||||
* <li>{@link Resource} required for each request.</li>
|
||||
* <li>
|
||||
* Number of containers of such specifications which are required
|
||||
* Number of containers, of above specifications, which are required
|
||||
* by the application.
|
||||
* </li>
|
||||
* <li>
|
||||
* A boolean <em>relaxLocality</em> flag, defaulting to <code>true</code>,
|
||||
* which tells the <code>ResourceManager</code> if the application wants
|
||||
* locality to be loose (i.e. allows fall-through to rack or <em>any</em>)
|
||||
* or strict (i.e. specify hard constraint on resource allocation).
|
||||
* </li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProtoOrBuilder;
|
||||
|
||||
public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
|
||||
|
||||
ResourceBlacklistRequestProto proto = null;
|
||||
ResourceBlacklistRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
List<String> blacklistAdditions = null;
|
||||
List<String> blacklistRemovals = null;
|
||||
|
||||
public ResourceBlacklistRequestPBImpl() {
|
||||
builder = ResourceBlacklistRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public ResourceBlacklistRequestPBImpl(ResourceBlacklistRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public ResourceBlacklistRequestProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = ResourceBlacklistRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.blacklistAdditions != null) {
|
||||
addBlacklistAdditionsToProto();
|
||||
}
|
||||
if (this.blacklistRemovals != null) {
|
||||
addBlacklistRemovalsToProto();
|
||||
}
|
||||
}
|
||||
|
||||
private void addBlacklistAdditionsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearBlacklistAdditions();
|
||||
if (this.blacklistAdditions == null) {
|
||||
return;
|
||||
}
|
||||
builder.addAllBlacklistAdditions(this.blacklistAdditions);
|
||||
}
|
||||
|
||||
private void addBlacklistRemovalsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearBlacklistAdditions();
|
||||
if (this.blacklistRemovals == null) {
|
||||
return;
|
||||
}
|
||||
builder.addAllBlacklistRemovals(this.blacklistRemovals);
|
||||
}
|
||||
|
||||
private void initBlacklistAdditions() {
|
||||
if (this.blacklistAdditions != null) {
|
||||
return;
|
||||
}
|
||||
ResourceBlacklistRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<String> list = p.getBlacklistAdditionsList();
|
||||
this.blacklistAdditions = new ArrayList<String>();
|
||||
this.blacklistAdditions.addAll(list);
|
||||
}
|
||||
|
||||
private void initBlacklistRemovals() {
|
||||
if (this.blacklistRemovals != null) {
|
||||
return;
|
||||
}
|
||||
ResourceBlacklistRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<String> list = p.getBlacklistRemovalsList();
|
||||
this.blacklistRemovals = new ArrayList<String>();
|
||||
this.blacklistRemovals.addAll(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getBlacklistAdditions() {
|
||||
initBlacklistAdditions();
|
||||
return this.blacklistAdditions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBlacklistAdditions(List<String> resourceNames) {
|
||||
if (resourceNames == null) {
|
||||
if (this.blacklistAdditions != null) {
|
||||
this.blacklistAdditions.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
initBlacklistAdditions();
|
||||
this.blacklistAdditions.clear();
|
||||
this.blacklistAdditions.addAll(resourceNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getBlacklistRemovals() {
|
||||
initBlacklistRemovals();
|
||||
return this.blacklistRemovals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBlacklistRemovals(List<String> resourceNames) {
|
||||
if (resourceNames == null) {
|
||||
if (this.blacklistRemovals != null) {
|
||||
this.blacklistRemovals.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
initBlacklistRemovals();
|
||||
this.blacklistRemovals.clear();
|
||||
this.blacklistRemovals.addAll(resourceNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
}
|
|
@ -229,6 +229,11 @@ message PreemptionResourceRequestProto {
|
|||
optional ResourceRequestProto resource = 1;
|
||||
}
|
||||
|
||||
message ResourceBlacklistRequestProto {
|
||||
repeated string blacklist_additions = 1;
|
||||
repeated string blacklist_removals = 2;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
////// From client_RM_Protocol /////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -54,8 +54,9 @@ message AllocateRequestProto {
|
|||
optional ApplicationAttemptIdProto application_attempt_id = 1;
|
||||
repeated ResourceRequestProto ask = 2;
|
||||
repeated ContainerIdProto release = 3;
|
||||
optional int32 response_id = 4;
|
||||
optional float progress = 5;
|
||||
optional ResourceBlacklistRequestProto blacklist_request = 4;
|
||||
optional int32 response_id = 5;
|
||||
optional float progress = 6;
|
||||
}
|
||||
|
||||
message AllocateResponseProto {
|
||||
|
|
|
@ -228,7 +228,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
|
|||
release.clear();
|
||||
allocateRequest =
|
||||
AllocateRequest.newInstance(appAttemptId, lastResponseId,
|
||||
progressIndicator, askList, releaseList);
|
||||
progressIndicator, askList, releaseList, null);
|
||||
}
|
||||
|
||||
allocateResponse = rmClient.allocate(allocateRequest);
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionContract;
|
|||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceBlacklistRequestException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
|
@ -287,17 +289,34 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
List<ResourceRequest> ask = request.getAskList();
|
||||
List<ContainerId> release = request.getReleaseList();
|
||||
|
||||
ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
|
||||
List<String> blacklistAdditions =
|
||||
(blacklistRequest != null) ?
|
||||
blacklistRequest.getBlacklistAdditions() : null;
|
||||
List<String> blacklistRemovals =
|
||||
(blacklistRequest != null) ?
|
||||
blacklistRequest.getBlacklistRemovals() : null;
|
||||
|
||||
// sanity check
|
||||
try {
|
||||
SchedulerUtils.validateResourceRequests(ask,
|
||||
rScheduler.getMaximumResourceCapability());
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
try {
|
||||
SchedulerUtils.validateBlacklistRequest(blacklistRequest);
|
||||
} catch (InvalidResourceBlacklistRequestException e) {
|
||||
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Send new requests to appAttempt.
|
||||
Allocation allocation =
|
||||
this.rScheduler.allocate(appAttemptId, ask, release);
|
||||
this.rScheduler.allocate(appAttemptId, ask, release,
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
|
||||
RMApp app = this.rmContext.getRMApps().get(
|
||||
appAttemptId.getApplicationId());
|
||||
|
|
|
@ -803,7 +803,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
// AM resource has been checked when submission
|
||||
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
|
||||
appAttempt.applicationAttemptId,
|
||||
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
|
||||
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null);
|
||||
if (amContainerAllocation != null
|
||||
&& amContainerAllocation.getContainers() != null) {
|
||||
assert (amContainerAllocation.getContainers().size() == 0);
|
||||
|
@ -827,7 +827,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
// Acquire the AM container from the scheduler.
|
||||
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
|
||||
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
|
||||
EMPTY_CONTAINER_RELEASE_LIST);
|
||||
EMPTY_CONTAINER_RELEASE_LIST, null, null);
|
||||
|
||||
// Set the masterContainer
|
||||
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -59,6 +60,7 @@ public class AppSchedulingInfo {
|
|||
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
|
||||
final Map<Priority, Map<String, ResourceRequest>> requests =
|
||||
new HashMap<Priority, Map<String, ResourceRequest>>();
|
||||
final Set<String> blacklist = new HashSet<String>();
|
||||
|
||||
//private final ApplicationStore store;
|
||||
private final ActiveUsersManager activeUsersManager;
|
||||
|
@ -114,12 +116,15 @@ public class AppSchedulingInfo {
|
|||
* application, by asking for more resources and releasing resources acquired
|
||||
* by the application.
|
||||
*
|
||||
* @param requests
|
||||
* resources to be acquired
|
||||
* @param requests resources to be acquired
|
||||
* @param blacklistAdditions resources to be added to the blacklist
|
||||
* @param blacklistRemovals resources to be removed from the blacklist
|
||||
*/
|
||||
synchronized public void updateResourceRequests(
|
||||
List<ResourceRequest> requests) {
|
||||
List<ResourceRequest> requests,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
QueueMetrics metrics = queue.getMetrics();
|
||||
|
||||
// Update resource requests
|
||||
for (ResourceRequest request : requests) {
|
||||
Priority priority = request.getPriority();
|
||||
|
@ -175,6 +180,20 @@ public class AppSchedulingInfo {
|
|||
lastRequestContainers)));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Update blacklist
|
||||
//
|
||||
|
||||
// Add to blacklist
|
||||
if (blacklistAdditions != null) {
|
||||
blacklist.addAll(blacklistAdditions);
|
||||
}
|
||||
|
||||
// Remove from blacklist
|
||||
if (blacklistRemovals != null) {
|
||||
blacklist.removeAll(blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized public Collection<Priority> getPriorities() {
|
||||
|
@ -197,6 +216,10 @@ public class AppSchedulingInfo {
|
|||
return request.getCapability();
|
||||
}
|
||||
|
||||
public synchronized boolean isBlacklisted(String resourceName) {
|
||||
return blacklist.contains(resourceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resources have been allocated to this application by the resource
|
||||
* scheduler. Track them.
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* The exception is thrown when an application provides an invalid
|
||||
* specification for the blacklist.
|
||||
*
|
||||
* As an e.g., currently this exceptions is thrown when an application
|
||||
* tries to blacklist {@link ResourceRequest#ANY}.
|
||||
*/
|
||||
public class InvalidResourceBlacklistRequestException extends YarnException {
|
||||
|
||||
private static final long serialVersionUID = 384957911L;
|
||||
|
||||
public InvalidResourceBlacklistRequestException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public InvalidResourceBlacklistRequestException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public InvalidResourceBlacklistRequestException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
}
|
|
@ -18,14 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* The exception is thrown when the requested resource is out of the range
|
||||
* The exception is thrown when a {@link ResourceRequest} is out of the range
|
||||
* of the configured lower and upper resource boundaries.
|
||||
*
|
||||
*/
|
||||
public class InvalidResourceRequestException extends YarnRuntimeException {
|
||||
public class InvalidResourceRequestException extends YarnException {
|
||||
|
||||
private static final long serialVersionUID = 13498237L;
|
||||
|
||||
public InvalidResourceRequestException(Throwable cause) {
|
||||
super(cause);
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
@ -152,4 +153,14 @@ public class SchedulerUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest)
|
||||
throws InvalidResourceBlacklistRequestException {
|
||||
if (blacklistRequest != null) {
|
||||
List<String> plus = blacklistRequest.getBlacklistAdditions();
|
||||
if (plus != null && plus.contains(ResourceRequest.ANY)) {
|
||||
throw new InvalidResourceBlacklistRequestException(
|
||||
"Cannot add " + ResourceRequest.ANY + " to the blacklist!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,6 +95,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
* @param appAttemptId
|
||||
* @param ask
|
||||
* @param release
|
||||
* @param blacklistAdditions
|
||||
* @param blacklistRemovals
|
||||
* @return the {@link Allocation} for the application
|
||||
*/
|
||||
@Public
|
||||
|
@ -102,7 +104,9 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
Allocation
|
||||
allocate(ApplicationAttemptId appAttemptId,
|
||||
List<ResourceRequest> ask,
|
||||
List<ContainerId> release);
|
||||
List<ContainerId> release,
|
||||
List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals);
|
||||
|
||||
/**
|
||||
* Get node resource usage report.
|
||||
|
|
|
@ -472,7 +472,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
|
|||
@Override
|
||||
@Lock(Lock.NoLock.class)
|
||||
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release) {
|
||||
List<ResourceRequest> ask, List<ContainerId> release,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
|
||||
FiCaSchedulerApp application = getApplication(applicationAttemptId);
|
||||
if (application == null) {
|
||||
|
@ -523,7 +524,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
|
|||
application.showRequests();
|
||||
|
||||
// Update application requests
|
||||
application.updateResourceRequests(ask);
|
||||
application.updateResourceRequests(ask,
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
|
||||
LOG.debug("allocate: post-update");
|
||||
application.showRequests();
|
||||
|
|
|
@ -815,6 +815,11 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
synchronized (application) {
|
||||
// Check if this resource is on the blacklist
|
||||
if (isBlacklisted(application, node)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Schedule in priority order
|
||||
for (Priority priority : application.getPriorities()) {
|
||||
// Required resource
|
||||
|
@ -898,6 +903,28 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
}
|
||||
|
||||
boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) {
|
||||
if (application.isBlacklisted(node.getHostName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'host' " + node.getHostName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (application.isBlacklisted(node.getRackName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'rack' " + node.getRackName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private synchronized CSAssignment
|
||||
assignReservedContainer(FiCaSchedulerApp application,
|
||||
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
|
||||
|
|
|
@ -134,9 +134,11 @@ public class FiCaSchedulerApp extends SchedulerApplication {
|
|||
}
|
||||
|
||||
public synchronized void updateResourceRequests(
|
||||
List<ResourceRequest> requests) {
|
||||
List<ResourceRequest> requests,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
if (!isStopped) {
|
||||
this.appSchedulingInfo.updateResourceRequests(requests);
|
||||
this.appSchedulingInfo.updateResourceRequests(requests,
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,6 +166,10 @@ public class FiCaSchedulerApp extends SchedulerApplication {
|
|||
return this.appSchedulingInfo.getResource(priority);
|
||||
}
|
||||
|
||||
public boolean isBlacklisted(String resourceName) {
|
||||
return this.appSchedulingInfo.isBlacklisted(resourceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this application pending?
|
||||
* @return true if it is else false.
|
||||
|
|
|
@ -138,7 +138,7 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|||
|
||||
public synchronized void updateResourceRequests(
|
||||
List<ResourceRequest> requests) {
|
||||
this.appSchedulingInfo.updateResourceRequests(requests);
|
||||
this.appSchedulingInfo.updateResourceRequests(requests, null, null);
|
||||
}
|
||||
|
||||
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
|
||||
|
|
|
@ -718,7 +718,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
|
||||
@Override
|
||||
public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release) {
|
||||
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
|
||||
// Make sure this application exists
|
||||
FSSchedulerApp application = applications.get(appAttemptId);
|
||||
|
|
|
@ -222,7 +222,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
@Override
|
||||
public Allocation allocate(
|
||||
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
|
||||
List<ContainerId> release) {
|
||||
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
FiCaSchedulerApp application = getApplication(applicationAttemptId);
|
||||
if (application == null) {
|
||||
LOG.error("Calling allocate on removed " +
|
||||
|
@ -268,7 +268,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
application.showRequests();
|
||||
|
||||
// Update application requests
|
||||
application.updateResourceRequests(ask);
|
||||
application.updateResourceRequests(ask, null, null);
|
||||
|
||||
LOG.debug("allocate: post-update" +
|
||||
" applicationId=" + applicationAttemptId +
|
||||
|
|
|
@ -265,7 +265,7 @@ public class Application {
|
|||
|
||||
// Get resources from the ResourceManager
|
||||
resourceManager.getResourceScheduler().allocate(applicationAttemptId,
|
||||
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>());
|
||||
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(), null, null);
|
||||
System.out.println("-=======" + applicationAttemptId);
|
||||
System.out.println("----------" + resourceManager.getRMContext().getRMApps()
|
||||
.get(applicationId).getRMAppAttempt(applicationAttemptId));
|
||||
|
|
|
@ -144,7 +144,7 @@ public class MockAM {
|
|||
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
|
||||
throws Exception {
|
||||
AllocateRequest req = AllocateRequest.newInstance(attemptId,
|
||||
++responseId, 0F, resourceRequest, releases);
|
||||
++responseId, 0F, resourceRequest, releases, null);
|
||||
return amRMProtocol.allocate(req);
|
||||
}
|
||||
|
||||
|
|
|
@ -273,23 +273,23 @@ public class TestFifoScheduler {
|
|||
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
|
||||
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
|
||||
fs.allocate(appAttemptId1, ask1, emptyId);
|
||||
fs.allocate(appAttemptId1, ask1, emptyId, null, null);
|
||||
|
||||
// Ask for a 2 GB container for app 2
|
||||
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
|
||||
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||
ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
|
||||
fs.allocate(appAttemptId2, ask2, emptyId);
|
||||
fs.allocate(appAttemptId2, ask2, emptyId, null, null);
|
||||
|
||||
// Trigger container assignment
|
||||
fs.handle(new NodeUpdateSchedulerEvent(n1));
|
||||
|
||||
// Get the allocation for the applications and verify headroom
|
||||
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);
|
||||
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||
Assert.assertEquals("Allocation headroom", 1 * GB,
|
||||
allocation1.getResourceLimit().getMemory());
|
||||
|
||||
Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId);
|
||||
Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null);
|
||||
Assert.assertEquals("Allocation headroom", 1 * GB,
|
||||
allocation2.getResourceLimit().getMemory());
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// allocate request returns no updated node
|
||||
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||
AllocateResponse response1 = amService.allocate(allocateRequest1);
|
||||
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
@ -117,7 +117,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// allocate request returns updated node
|
||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||
response1 = amService.allocate(allocateRequest1);
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
|
@ -137,7 +137,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// subsequent allocate request returns delta
|
||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||
response1 = amService.allocate(allocateRequest1);
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
|
@ -157,7 +157,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// allocate request returns no updated node
|
||||
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||
AllocateResponse response2 = amService.allocate(allocateRequest2);
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
@ -166,7 +166,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// both AM's should get delta updated nodes
|
||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||
response1 = amService.allocate(allocateRequest1);
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
|
@ -175,7 +175,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
||||
|
||||
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
||||
response2 = amService.allocate(allocateRequest2);
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
|
@ -185,7 +185,7 @@ public class TestAMRMRPCNodeUpdates {
|
|||
|
||||
// subsequent allocate calls should return no updated nodes
|
||||
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
||||
response2 = amService.allocate(allocateRequest2);
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
|
|
@ -78,13 +78,13 @@ public class TestAMRMRPCResponseId {
|
|||
am.registerAppAttempt();
|
||||
|
||||
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||
|
||||
AllocateResponse response = amService.allocate(allocateRequest);
|
||||
Assert.assertEquals(1, response.getResponseId());
|
||||
Assert.assertFalse(response.getResync());
|
||||
allocateRequest = AllocateRequest.newInstance(attempt
|
||||
.getAppAttemptId(), response.getResponseId(), 0F, null, null);
|
||||
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
|
||||
|
||||
response = amService.allocate(allocateRequest);
|
||||
Assert.assertEquals(2, response.getResponseId());
|
||||
|
@ -94,7 +94,7 @@ public class TestAMRMRPCResponseId {
|
|||
|
||||
/** try sending old request again **/
|
||||
allocateRequest = AllocateRequest.newInstance(attempt
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||
response = amService.allocate(allocateRequest);
|
||||
Assert.assertTrue(response.getResync());
|
||||
}
|
||||
|
|
|
@ -330,7 +330,7 @@ public class TestRMAppAttemptTransitions {
|
|||
applicationAttempt.getAppAttemptState());
|
||||
verify(scheduler, times(expectedAllocateCount)).
|
||||
allocate(any(ApplicationAttemptId.class),
|
||||
any(List.class), any(List.class));
|
||||
any(List.class), any(List.class), any(List.class), any(List.class));
|
||||
|
||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||
assertNull(applicationAttempt.getMasterContainer());
|
||||
|
@ -345,6 +345,7 @@ public class TestRMAppAttemptTransitions {
|
|||
/**
|
||||
* {@link RMAppAttemptState#ALLOCATED}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void testAppAttemptAllocatedState(Container amContainer) {
|
||||
assertEquals(RMAppAttemptState.ALLOCATED,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
|
@ -354,7 +355,9 @@ public class TestRMAppAttemptTransitions {
|
|||
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
|
||||
verify(scheduler, times(2)).
|
||||
allocate(
|
||||
any(ApplicationAttemptId.class), any(List.class), any(List.class));
|
||||
any(
|
||||
ApplicationAttemptId.class), any(List.class), any(List.class),
|
||||
any(List.class), any(List.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -465,6 +468,7 @@ public class TestRMAppAttemptTransitions {
|
|||
testAppAttemptScheduledState();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Container allocateApplicationAttempt() {
|
||||
scheduleApplicationAttempt();
|
||||
|
||||
|
@ -481,6 +485,8 @@ public class TestRMAppAttemptTransitions {
|
|||
scheduler.allocate(
|
||||
any(ApplicationAttemptId.class),
|
||||
any(List.class),
|
||||
any(List.class),
|
||||
any(List.class),
|
||||
any(List.class))).
|
||||
thenReturn(allocation);
|
||||
|
||||
|
|
|
@ -22,20 +22,47 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSchedulerUtils {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testNormalizeRequest() {
|
||||
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
|
||||
|
@ -240,4 +267,79 @@ public class TestSchedulerUtils {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateResourceBlacklistRequest() throws Exception {
|
||||
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm =
|
||||
new MockRMWithAMS(new YarnConfiguration(), containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
Map<ApplicationAccessType, String> acls =
|
||||
new HashMap<ApplicationAccessType, String>(2);
|
||||
acls.put(ApplicationAccessType.VIEW_APP, "*");
|
||||
RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
waitForLaunchedState(attempt);
|
||||
|
||||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
|
||||
|
||||
AMRMProtocol client = currentUser
|
||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
|
||||
.getApplicationMasterService().getBindAddress(), conf);
|
||||
}
|
||||
});
|
||||
|
||||
RegisterApplicationMasterRequest request = Records
|
||||
.newRecord(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(applicationAttemptId);
|
||||
client.registerApplicationMaster(request);
|
||||
|
||||
ResourceBlacklistRequest blacklistRequest =
|
||||
ResourceBlacklistRequest.newInstance(
|
||||
Collections.singletonList(ResourceRequest.ANY), null);
|
||||
|
||||
AllocateRequest allocateRequest =
|
||||
AllocateRequest.newInstance(applicationAttemptId, 0, 0.0f, null, null,
|
||||
blacklistRequest);
|
||||
boolean error = false;
|
||||
try {
|
||||
client.allocate(allocateRequest);
|
||||
} catch (InvalidResourceBlacklistRequestException e) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
rm.stop();
|
||||
|
||||
Assert.assertTrue(
|
||||
"Didn't not catch InvalidResourceBlacklistRequestException", error);
|
||||
}
|
||||
|
||||
private void waitForLaunchedState(RMAppAttempt attempt)
|
||||
throws InterruptedException {
|
||||
int waitCount = 0;
|
||||
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
|
||||
&& waitCount++ < 20) {
|
||||
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
|
||||
+ "Current state is " + attempt.getAppAttemptState());
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertEquals(attempt.getAppAttemptState(),
|
||||
RMAppAttemptState.LAUNCHED);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -513,7 +513,7 @@ public class TestApplicationLimits {
|
|||
app_0_0_requests.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
|
||||
true, priority_1, recordFactory));
|
||||
app_0_0.updateResourceRequests(app_0_0_requests);
|
||||
app_0_0.updateResourceRequests(app_0_0_requests, null, null);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0);
|
||||
|
@ -532,7 +532,7 @@ public class TestApplicationLimits {
|
|||
app_0_1_requests.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
|
||||
true, priority_1, recordFactory));
|
||||
app_0_1.updateResourceRequests(app_0_1_requests);
|
||||
app_0_1.updateResourceRequests(app_0_1_requests, null, null);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
|
@ -551,7 +551,7 @@ public class TestApplicationLimits {
|
|||
app_1_0_requests.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
|
||||
true, priority_1, recordFactory));
|
||||
app_1_0.updateResourceRequests(app_1_0_requests);
|
||||
app_1_0.updateResourceRequests(app_1_0_requests, null, null);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
@ -293,7 +294,7 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -415,11 +416,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -548,11 +549,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
/**
|
||||
* Start testing...
|
||||
|
@ -641,11 +642,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
/**
|
||||
* Start testing...
|
||||
|
@ -680,7 +681,7 @@ public class TestLeafQueue {
|
|||
a.setMaxCapacity(.1f);
|
||||
app_2.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
|
||||
|
||||
// No more to user_0 since he is already over user-limit
|
||||
|
@ -697,7 +698,7 @@ public class TestLeafQueue {
|
|||
LOG.info("here");
|
||||
app_1.updateResourceRequests(Collections.singletonList( // unset
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
|
||||
a.assignContainers(clusterResource, node_1);
|
||||
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
|
||||
|
@ -758,11 +759,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
/**
|
||||
* Start testing...
|
||||
|
@ -792,11 +793,11 @@ public class TestLeafQueue {
|
|||
|
||||
app_2.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_3.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Now allocations should goto app_2 since
|
||||
// user_0 is at limit inspite of high user-limit-factor
|
||||
|
@ -920,11 +921,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -1022,7 +1023,7 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Setup app_1 to request a 4GB container on host_0 and
|
||||
// another 4GB container anywhere.
|
||||
|
@ -1034,7 +1035,7 @@ public class TestLeafQueue {
|
|||
true, priority, recordFactory));
|
||||
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
|
||||
true, priority, recordFactory));
|
||||
app_1.updateResourceRequests(appRequests_1);
|
||||
app_1.updateResourceRequests(appRequests_1, null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -1128,11 +1129,11 @@ public class TestLeafQueue {
|
|||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
priority, recordFactory)), null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -1255,7 +1256,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
|
||||
// Start testing...
|
||||
CSAssignment assignment = null;
|
||||
|
@ -1320,7 +1321,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
String host_3 = "127.0.0.4"; // on rack_1
|
||||
|
@ -1411,7 +1412,7 @@ public class TestLeafQueue {
|
|||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
|
||||
true, priority_2, recordFactory));
|
||||
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -1526,7 +1527,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
|
||||
// Start testing...
|
||||
|
||||
|
@ -1535,7 +1536,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
|
||||
// NODE_LOCAL - node_0_1
|
||||
a.assignContainers(clusterResource, node_0_0);
|
||||
|
@ -1558,7 +1559,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
|
||||
// No allocation on node_0_1 even though it's node/rack local since
|
||||
// required(rack_1) == 0
|
||||
|
@ -1759,7 +1760,6 @@ public class TestLeafQueue {
|
|||
// Setup some nodes and racks
|
||||
String host_0_0 = "127.0.0.1";
|
||||
String rack_0 = "rack_0";
|
||||
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
|
||||
String host_0_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
|
||||
|
||||
|
@ -1789,6 +1789,7 @@ public class TestLeafQueue {
|
|||
// host_0_1: 8G
|
||||
// host_1_0: 8G
|
||||
// host_1_1: 8G
|
||||
// Blacklist: <host_0_0>
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
||||
app_0_requests_0.add(
|
||||
|
@ -1803,7 +1804,8 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
|
||||
false, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0,
|
||||
Collections.singletonList(host_0_0), null);
|
||||
app_0_requests_0.clear();
|
||||
|
||||
//
|
||||
|
@ -1830,6 +1832,7 @@ public class TestLeafQueue {
|
|||
// host_0_1: 8G
|
||||
// host_1_0: 8G
|
||||
// host_1_1: 8G
|
||||
// Blacklist: <host_0_0>
|
||||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since RR(rack_1) = relax: false
|
||||
|
@ -1838,11 +1841,12 @@ public class TestLeafQueue {
|
|||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
|
||||
|
||||
// Allow rack-locality for rack_1
|
||||
// Allow rack-locality for rack_1, but blacklist node_1_1
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0,
|
||||
Collections.singletonList(host_1_1), null);
|
||||
app_0_requests_0.clear();
|
||||
|
||||
// resourceName: <priority, memory, #containers, relaxLocality>
|
||||
|
@ -1851,15 +1855,69 @@ public class TestLeafQueue {
|
|||
// rack_0: < null >
|
||||
// host_1_0: < 1, 1GB, 1, true >
|
||||
// host_1_1: < null >
|
||||
// rack_1: < 1, 1GB, 1, true > <----
|
||||
// rack_1: < 1, 1GB, 1, true >
|
||||
// ANY: < 1, 1GB, 1, false >
|
||||
// Availability:
|
||||
// host_0_0: 8G
|
||||
// host_0_1: 8G
|
||||
// host_1_0: 8G
|
||||
// host_1_1: 8G
|
||||
// Blacklist: < host_0_0 , host_1_1 > <----
|
||||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since node_1_1 is blacklisted
|
||||
a.assignContainers(clusterResource, node_1_1);
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
|
||||
|
||||
// Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
|
||||
app_0.updateResourceRequests(app_0_requests_0,
|
||||
Collections.singletonList(rack_1), Collections.singletonList(host_1_1));
|
||||
app_0_requests_0.clear();
|
||||
|
||||
// resourceName: <priority, memory, #containers, relaxLocality>
|
||||
// host_0_0: < 1, 1GB, 1, true >
|
||||
// host_0_1: < null >
|
||||
// rack_0: < null >
|
||||
// host_1_0: < 1, 1GB, 1, true >
|
||||
// host_1_1: < null >
|
||||
// rack_1: < 1, 1GB, 1, true >
|
||||
// ANY: < 1, 1GB, 1, false >
|
||||
// Availability:
|
||||
// host_0_0: 8G
|
||||
// host_0_1: 8G
|
||||
// host_1_0: 8G
|
||||
// host_1_1: 8G
|
||||
// Blacklist: < host_0_0 , rack_1 > <----
|
||||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since rack_1 is blacklisted
|
||||
a.assignContainers(clusterResource, node_1_1);
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
|
||||
|
||||
// Now remove rack_1 from blacklist
|
||||
app_0.updateResourceRequests(app_0_requests_0,
|
||||
null, Collections.singletonList(rack_1));
|
||||
app_0_requests_0.clear();
|
||||
|
||||
// resourceName: <priority, memory, #containers, relaxLocality>
|
||||
// host_0_0: < 1, 1GB, 1, true >
|
||||
// host_0_1: < null >
|
||||
// rack_0: < null >
|
||||
// host_1_0: < 1, 1GB, 1, true >
|
||||
// host_1_1: < null >
|
||||
// rack_1: < 1, 1GB, 1, true >
|
||||
// ANY: < 1, 1GB, 1, false >
|
||||
// Availability:
|
||||
// host_0_0: 8G
|
||||
// host_0_1: 8G
|
||||
// host_1_0: 8G
|
||||
// host_1_1: 8G
|
||||
// Blacklist: < host_0_0 > <----
|
||||
|
||||
// Now, should allocate since RR(rack_1) = relax: true
|
||||
a.assignContainers(clusterResource, node_1_1);
|
||||
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
|
||||
|
@ -1874,7 +1932,7 @@ public class TestLeafQueue {
|
|||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
|
||||
false, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
app_0.updateResourceRequests(app_0_requests_0, null, null);
|
||||
app_0_requests_0.clear();
|
||||
|
||||
// resourceName: <priority, memory, #containers, relaxLocality>
|
||||
|
|
|
@ -206,7 +206,7 @@ public class TestFairScheduler {
|
|||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
priority, numContainers, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -221,7 +221,7 @@ public class TestFairScheduler {
|
|||
ApplicationAttemptId attId) {
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ask.add(request);
|
||||
scheduler.allocate(attId, ask, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
|
||||
}
|
||||
|
||||
// TESTS
|
||||
|
@ -528,7 +528,7 @@ public class TestFairScheduler {
|
|||
ResourceRequest request1 =
|
||||
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
|
||||
ask1.add(request1);
|
||||
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
// Second ask, queue2 requests 1 large + (2 * minReqSize)
|
||||
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
|
||||
|
@ -538,14 +538,14 @@ public class TestFairScheduler {
|
|||
false);
|
||||
ask2.add(request2);
|
||||
ask2.add(request3);
|
||||
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
// Third ask, queue2 requests 1 large
|
||||
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request4 =
|
||||
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true);
|
||||
ask3.add(request4);
|
||||
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
|
@ -1369,7 +1369,7 @@ public class TestFairScheduler {
|
|||
|
||||
// Complete container
|
||||
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
|
||||
Arrays.asList(containerId));
|
||||
Arrays.asList(containerId), null, null);
|
||||
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||
|
||||
// Schedule at opening
|
||||
|
@ -1444,7 +1444,7 @@ public class TestFairScheduler {
|
|||
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
|
||||
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
|
||||
|
||||
scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(appId, asks, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
// node 1 checks in
|
||||
scheduler.update();
|
||||
|
@ -1799,7 +1799,7 @@ public class TestFairScheduler {
|
|||
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
|
||||
createResourceRequest(1024, "rack1", 1, 0, true),
|
||||
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
|
||||
scheduler.allocate(attId1, update, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
// then node2 should get the container
|
||||
scheduler.handle(node2UpdateEvent);
|
||||
|
@ -1842,7 +1842,7 @@ public class TestFairScheduler {
|
|||
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
|
||||
1, 1, false);
|
||||
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
|
||||
new ArrayList<ContainerId>());
|
||||
new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
scheduler.handle(nodeUpdateEvent);
|
||||
assertEquals(0, app.getReservedContainers().size());
|
||||
|
|
|
@ -186,7 +186,7 @@ public class TestFifoScheduler {
|
|||
ask.add(nodeLocal);
|
||||
ask.add(rackLocal);
|
||||
ask.add(any);
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>());
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
|
||||
|
||||
|
|
|
@ -501,7 +501,7 @@ public class TestContainerManagerSecurity {
|
|||
|
||||
AllocateRequest allocateRequest = AllocateRequest.newInstance(
|
||||
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
|
||||
new ArrayList<ContainerId>());
|
||||
new ArrayList<ContainerId>(), null);
|
||||
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
|
||||
.getAllocatedContainers();
|
||||
|
||||
|
|
Loading…
Reference in New Issue