YARN-7900. [AMRMProxy] AMRMClientRelayer for stateful FederationInterceptor. (Botong Huang via asuresh)
This commit is contained in:
parent
734cbdd6b2
commit
113e2d6801
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.utils;
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
|
@ -0,0 +1,336 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* A component that sits in between AMRMClient(Impl) and Yarn RM. It remembers
|
||||
* pending requests similar to AMRMClient, and handles RM re-sync automatically
|
||||
* without propagate the re-sync exception back to AMRMClient.
|
||||
*/
|
||||
public class AMRMClientRelayer extends AbstractService
|
||||
implements ApplicationMasterProtocol {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AMRMClientRelayer.class);
|
||||
|
||||
private ApplicationMasterProtocol rmClient;
|
||||
|
||||
/**
|
||||
* The original registration request that was sent by the AM. This instance is
|
||||
* reused to register/re-register with all the sub-cluster RMs.
|
||||
*/
|
||||
private RegisterApplicationMasterRequest amRegistrationRequest;
|
||||
|
||||
/**
|
||||
* Similar to AMRMClientImpl, all data structures below have two versions:
|
||||
*
|
||||
* The remote ones are all the pending requests that RM has not fulfill yet.
|
||||
* Whenever RM fails over, we re-register and then full re-send all these
|
||||
* pending requests.
|
||||
*
|
||||
* The non-remote ones are the requests that RM has not received yet. When RM
|
||||
* throws non-fail-over exception back, the request is considered not received
|
||||
* by RM. We will merge with new requests and re-send in the next heart beat.
|
||||
*/
|
||||
private Map<ResourceRequestSetKey, ResourceRequestSet> remotePendingAsks =
|
||||
new HashMap<>();
|
||||
/**
|
||||
* Same as AMRMClientImpl, we need to use a custom comparator that does not
|
||||
* look at ResourceRequest.getNumContainers() here. TreeSet allows a custom
|
||||
* comparator.
|
||||
*/
|
||||
private Set<ResourceRequest> ask =
|
||||
new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
|
||||
|
||||
private Set<ContainerId> remotePendingRelease = new HashSet<>();
|
||||
private Set<ContainerId> release = new HashSet<>();
|
||||
|
||||
private Set<String> remoteBlacklistedNodes = new HashSet<>();
|
||||
private Set<String> blacklistAdditions = new HashSet<>();
|
||||
private Set<String> blacklistRemovals = new HashSet<>();
|
||||
|
||||
private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
|
||||
new HashMap<>();
|
||||
private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
|
||||
|
||||
public AMRMClientRelayer() {
|
||||
super(AMRMClientRelayer.class.getName());
|
||||
}
|
||||
|
||||
public AMRMClientRelayer(ApplicationMasterProtocol rmClient) {
|
||||
this();
|
||||
this.rmClient = rmClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
final YarnConfiguration conf = new YarnConfiguration(getConfig());
|
||||
try {
|
||||
if (this.rmClient == null) {
|
||||
this.rmClient =
|
||||
ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (this.rmClient != null) {
|
||||
RPC.stopProxy(this.rmClient);
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
this.amRegistrationRequest = request;
|
||||
return this.rmClient.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
try {
|
||||
return this.rmClient.finishApplicationMaster(request);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.warn("Out of sync with ResourceManager, hence resyncing.");
|
||||
// re register with RM
|
||||
registerApplicationMaster(this.amRegistrationRequest);
|
||||
return finishApplicationMaster(request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
||||
throws YarnException, IOException {
|
||||
AllocateResponse allocateResponse = null;
|
||||
try {
|
||||
synchronized (this) {
|
||||
// update the data structures first
|
||||
addNewAsks(allocateRequest.getAskList());
|
||||
|
||||
if (allocateRequest.getReleaseList() != null) {
|
||||
this.remotePendingRelease.addAll(allocateRequest.getReleaseList());
|
||||
this.release.addAll(allocateRequest.getReleaseList());
|
||||
}
|
||||
|
||||
if (allocateRequest.getResourceBlacklistRequest() != null) {
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistAdditions() != null) {
|
||||
this.remoteBlacklistedNodes.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
this.blacklistAdditions.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
}
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistRemovals() != null) {
|
||||
this.remoteBlacklistedNodes.removeAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
this.blacklistRemovals.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
}
|
||||
}
|
||||
|
||||
if (allocateRequest.getUpdateRequests() != null) {
|
||||
for (UpdateContainerRequest update : allocateRequest
|
||||
.getUpdateRequests()) {
|
||||
this.remotePendingChange.put(update.getContainerId(), update);
|
||||
this.change.put(update.getContainerId(), update);
|
||||
}
|
||||
}
|
||||
|
||||
ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
|
||||
for (ResourceRequest r : ask) {
|
||||
// create a copy of ResourceRequest as we might change it while the
|
||||
// RPC layer is using it to send info across
|
||||
askList.add(ResourceRequest.newBuilder().priority(r.getPriority())
|
||||
.resourceName(r.getResourceName()).capability(r.getCapability())
|
||||
.numContainers(r.getNumContainers())
|
||||
.relaxLocality(r.getRelaxLocality())
|
||||
.nodeLabelExpression(r.getNodeLabelExpression())
|
||||
.executionTypeRequest(r.getExecutionTypeRequest())
|
||||
.allocationRequestId(r.getAllocationRequestId()).build());
|
||||
}
|
||||
|
||||
allocateRequest = AllocateRequest.newBuilder()
|
||||
.responseId(allocateRequest.getResponseId())
|
||||
.progress(allocateRequest.getProgress()).askList(askList)
|
||||
.releaseList(new ArrayList<>(this.release))
|
||||
.resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
|
||||
new ArrayList<>(this.blacklistAdditions),
|
||||
new ArrayList<>(this.blacklistRemovals)))
|
||||
.updateRequests(new ArrayList<>(this.change.values())).build();
|
||||
}
|
||||
|
||||
// Do the actual allocate call
|
||||
try {
|
||||
allocateResponse = this.rmClient.allocate(allocateRequest);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resyncing.");
|
||||
|
||||
synchronized (this) {
|
||||
// Add all remotePending data into to-send data structures
|
||||
for (ResourceRequestSet requestSet : this.remotePendingAsks
|
||||
.values()) {
|
||||
for (ResourceRequest rr : requestSet.getRRs()) {
|
||||
addResourceRequestToAsk(rr);
|
||||
}
|
||||
}
|
||||
this.release.addAll(this.remotePendingRelease);
|
||||
this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
|
||||
this.change.putAll(this.remotePendingChange);
|
||||
}
|
||||
|
||||
// re register with RM, then retry allocate recursively
|
||||
registerApplicationMaster(this.amRegistrationRequest);
|
||||
return allocate(allocateRequest);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
// Process the allocate response from RM
|
||||
if (allocateResponse.getCompletedContainersStatuses() != null) {
|
||||
for (ContainerStatus container : allocateResponse
|
||||
.getCompletedContainersStatuses()) {
|
||||
this.remotePendingRelease.remove(container.getContainerId());
|
||||
this.remotePendingChange.remove(container.getContainerId());
|
||||
}
|
||||
}
|
||||
|
||||
if (allocateResponse.getUpdatedContainers() != null) {
|
||||
for (UpdatedContainer updatedContainer : allocateResponse
|
||||
.getUpdatedContainers()) {
|
||||
this.remotePendingChange
|
||||
.remove(updatedContainer.getContainer().getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
/*
|
||||
* If allocateResponse is null, it means exception happened and RM did
|
||||
* not accept the request. Don't clear any data structures so that they
|
||||
* will be re-sent next time.
|
||||
*
|
||||
* Otherwise request was accepted by RM, we are safe to clear these.
|
||||
*/
|
||||
if (allocateResponse != null) {
|
||||
this.ask.clear();
|
||||
this.release.clear();
|
||||
|
||||
this.blacklistAdditions.clear();
|
||||
this.blacklistRemovals.clear();
|
||||
|
||||
this.change.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
|
||||
Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
|
||||
for (ResourceRequest rr : asks) {
|
||||
addResourceRequestToAsk(rr);
|
||||
|
||||
ResourceRequestSetKey key = new ResourceRequestSetKey(rr);
|
||||
touchedKeys.add(key);
|
||||
|
||||
ResourceRequestSet askSet = this.remotePendingAsks.get(key);
|
||||
if (askSet == null) {
|
||||
askSet = new ResourceRequestSet(key);
|
||||
this.remotePendingAsks.put(key, askSet);
|
||||
}
|
||||
askSet.addAndOverrideRR(rr);
|
||||
}
|
||||
|
||||
// Cleanup properly if needed
|
||||
for (ResourceRequestSetKey key : touchedKeys) {
|
||||
ResourceRequestSet askSet = this.remotePendingAsks.get(key);
|
||||
if (askSet.getNumContainers() == 0) {
|
||||
this.remotePendingAsks.remove(key);
|
||||
} else {
|
||||
// Remove non-any zero RRs
|
||||
askSet.cleanupZeroNonAnyRR();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||
// The ResourceRequestComparator doesn't look at container count when
|
||||
// comparing. So we need to make sure the new RR override the old if any
|
||||
this.ask.remove(remoteRequest);
|
||||
this.ask.add(remoteRequest);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Map<ResourceRequestSetKey, ResourceRequestSet>
|
||||
getRemotePendingAsks() {
|
||||
return this.remotePendingAsks;
|
||||
}
|
||||
|
||||
}
|
|
@ -27,12 +27,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.scheduler;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* A set of resource requests of the same scheduler key
|
||||
* {@link ResourceRequestSetKey}.
|
||||
*/
|
||||
public class ResourceRequestSet {
|
||||
|
||||
private ResourceRequestSetKey key;
|
||||
private int numContainers;
|
||||
// ResourceName -> RR
|
||||
private Map<String, ResourceRequest> asks;
|
||||
|
||||
/**
|
||||
* Create a empty set with given key.
|
||||
*
|
||||
* @param key the key of the request set
|
||||
* @throws YarnException if fails
|
||||
*/
|
||||
public ResourceRequestSet(ResourceRequestSetKey key) throws YarnException {
|
||||
this.key = key;
|
||||
// leave it zero for now, as if it is a cancel
|
||||
this.numContainers = 0;
|
||||
this.asks = new HashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a shallow copy of the request set.
|
||||
*
|
||||
* @param other the set of copy from
|
||||
*/
|
||||
public ResourceRequestSet(ResourceRequestSet other) {
|
||||
this.key = other.key;
|
||||
this.numContainers = other.numContainers;
|
||||
this.asks = new HashMap<>();
|
||||
// The assumption is that the RR objects should not be modified without
|
||||
// making a copy
|
||||
this.asks.putAll(other.asks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a {@link ResourceRequest} into the requestSet. If there's already an RR
|
||||
* with the same resource name, override it and update accordingly.
|
||||
*
|
||||
* @param ask the new {@link ResourceRequest}
|
||||
* @throws YarnException
|
||||
*/
|
||||
public void addAndOverrideRR(ResourceRequest ask) throws YarnException {
|
||||
if (!this.key.equals(new ResourceRequestSetKey(ask))) {
|
||||
throw new YarnException(
|
||||
"None compatible asks: \n" + ask + "\n" + this.key);
|
||||
}
|
||||
|
||||
// Override directly if exists
|
||||
this.asks.put(ask.getResourceName(), ask);
|
||||
|
||||
if (this.key.getExeType().equals(ExecutionType.GUARANTEED)) {
|
||||
// For G requestSet, update the numContainers only for ANY RR
|
||||
if (ask.getResourceName().equals(ResourceRequest.ANY)) {
|
||||
this.numContainers = ask.getNumContainers();
|
||||
}
|
||||
} else {
|
||||
// The assumption we made about O asks is that all RR in a requestSet has
|
||||
// the same numContainers value. So we just take the value of the last RR
|
||||
this.numContainers = ask.getNumContainers();
|
||||
}
|
||||
if (this.numContainers < 0) {
|
||||
throw new YarnException("numContainers becomes " + this.numContainers
|
||||
+ " when adding ask " + ask + "\n requestSet: " + toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge a requestSet into this one.
|
||||
*
|
||||
* @param requestSet the requestSet to merge
|
||||
* @throws YarnException
|
||||
*/
|
||||
public void addAndOverrideRRSet(ResourceRequestSet requestSet)
|
||||
throws YarnException {
|
||||
if (requestSet == null) {
|
||||
return;
|
||||
}
|
||||
for (ResourceRequest rr : requestSet.getRRs()) {
|
||||
addAndOverrideRR(rr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all non-Any ResourceRequests from the set. This is necessary cleanup
|
||||
* to avoid requestSet getting too big.
|
||||
*/
|
||||
public void cleanupZeroNonAnyRR() {
|
||||
Iterator<Entry<String, ResourceRequest>> iter =
|
||||
this.asks.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<String, ResourceRequest> entry = iter.next();
|
||||
if (entry.getKey().equals(ResourceRequest.ANY)) {
|
||||
// Do not delete ANY RR
|
||||
continue;
|
||||
}
|
||||
if (entry.getValue().getNumContainers() == 0) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, ResourceRequest> getAsks() {
|
||||
return this.asks;
|
||||
}
|
||||
|
||||
public Collection<ResourceRequest> getRRs() {
|
||||
return this.asks.values();
|
||||
}
|
||||
|
||||
public int getNumContainers() {
|
||||
return this.numContainers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Force set the # of containers to ask for this requestSet to a given value.
|
||||
*
|
||||
* @param newValue the new # of containers value
|
||||
* @throws YarnException
|
||||
*/
|
||||
public void setNumContainers(int newValue) throws YarnException {
|
||||
if (this.numContainers == 0) {
|
||||
throw new YarnException("should not set numContainers to " + newValue
|
||||
+ " for a cancel requestSet: " + toString());
|
||||
}
|
||||
|
||||
// Clone the ResourceRequest object whenever we need to change it
|
||||
int oldValue = this.numContainers;
|
||||
this.numContainers = newValue;
|
||||
if (this.key.getExeType().equals(ExecutionType.OPPORTUNISTIC)) {
|
||||
// The assumption we made about O asks is that all RR in a requestSet has
|
||||
// the same numContainers value
|
||||
Map<String, ResourceRequest> newAsks = new HashMap<>();
|
||||
for (ResourceRequest rr : this.asks.values()) {
|
||||
ResourceRequest clone = cloneResourceRequest(rr);
|
||||
clone.setNumContainers(newValue);
|
||||
newAsks.put(clone.getResourceName(), clone);
|
||||
}
|
||||
this.asks = newAsks;
|
||||
} else {
|
||||
ResourceRequest rr = this.asks.get(ResourceRequest.ANY);
|
||||
if (rr == null) {
|
||||
throw new YarnException(
|
||||
"No ANY RR found in requestSet with numContainers=" + oldValue);
|
||||
}
|
||||
ResourceRequest clone = cloneResourceRequest(rr);
|
||||
clone.setNumContainers(newValue);
|
||||
this.asks.put(ResourceRequest.ANY, clone);
|
||||
}
|
||||
}
|
||||
|
||||
private ResourceRequest cloneResourceRequest(ResourceRequest rr) {
|
||||
return ResourceRequest.newBuilder().priority(rr.getPriority())
|
||||
.resourceName(rr.getResourceName()).capability(rr.getCapability())
|
||||
.numContainers(rr.getNumContainers())
|
||||
.relaxLocality(rr.getRelaxLocality())
|
||||
.nodeLabelExpression(rr.getNodeLabelExpression())
|
||||
.executionTypeRequest(rr.getExecutionTypeRequest())
|
||||
.allocationRequestId(rr.getAllocationRequestId()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("{" + this.key.toString());
|
||||
for (Entry<String, ResourceRequest> entry : this.asks.entrySet()) {
|
||||
builder.append(
|
||||
" " + entry.getValue().getNumContainers() + ":" + entry.getKey());
|
||||
}
|
||||
builder.append("}");
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* The scheduler key for a group of {@link ResourceRequest}.
|
||||
*
|
||||
* TODO: after YARN-7631 is fixed by adding Resource and ExecType into
|
||||
* SchedulerRequestKey, then we can directly use that.
|
||||
*/
|
||||
public class ResourceRequestSetKey extends SchedulerRequestKey {
|
||||
|
||||
// More ResourceRequest key fields on top of SchedulerRequestKey
|
||||
private final Resource resource;
|
||||
private final ExecutionType execType;
|
||||
|
||||
/**
|
||||
* Create the key object from a {@link ResourceRequest}.
|
||||
*
|
||||
* @param rr Resource request object
|
||||
* @throws YarnException if fails
|
||||
*/
|
||||
public ResourceRequestSetKey(ResourceRequest rr) throws YarnException {
|
||||
this(rr.getAllocationRequestId(), rr.getPriority(), rr.getCapability(),
|
||||
((rr.getExecutionTypeRequest() == null) ? ExecutionType.GUARANTEED
|
||||
: rr.getExecutionTypeRequest().getExecutionType()));
|
||||
if (rr.getPriority() == null) {
|
||||
throw new YarnException("Null priority in RR: " + rr);
|
||||
}
|
||||
if (rr.getCapability() == null) {
|
||||
throw new YarnException("Null resource in RR: " + rr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the key object from member objects.
|
||||
*
|
||||
* @param allocationRequestId allocate request id of the ask
|
||||
* @param priority the priority of the ask
|
||||
* @param resource the resource size of the ask
|
||||
* @param execType the execution type of the ask
|
||||
*/
|
||||
public ResourceRequestSetKey(long allocationRequestId, Priority priority,
|
||||
Resource resource, ExecutionType execType) {
|
||||
super(priority, allocationRequestId, null);
|
||||
|
||||
if (resource == null) {
|
||||
this.resource = Resource.newInstance(0, 0);
|
||||
} else {
|
||||
this.resource = resource;
|
||||
}
|
||||
if (execType == null) {
|
||||
this.execType = ExecutionType.GUARANTEED;
|
||||
} else {
|
||||
this.execType = execType;
|
||||
}
|
||||
}
|
||||
|
||||
public Resource getResource() {
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
public ExecutionType getExeType() {
|
||||
return this.execType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null || !(obj instanceof SchedulerRequestKey)) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ResourceRequestSetKey)) {
|
||||
return super.equals(obj);
|
||||
}
|
||||
ResourceRequestSetKey other = (ResourceRequestSetKey) obj;
|
||||
return super.equals(other) && this.resource.equals(other.resource)
|
||||
&& this.execType.equals(other.execType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return ((super.hashCode() * 37 + this.resource.hashCode()) * 41)
|
||||
+ this.execType.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(SchedulerRequestKey other) {
|
||||
int ret = super.compareTo(other);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
if (!(other instanceof ResourceRequestSetKey)) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ResourceRequestSetKey otherKey = (ResourceRequestSetKey) other;
|
||||
ret = this.resource.compareTo(otherKey.resource);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
return this.execType.compareTo(otherKey.execType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[id:" + getAllocationRequestId() + " p:"
|
||||
+ getPriority().getPriority()
|
||||
+ (this.execType.equals(ExecutionType.GUARANTEED) ? " G"
|
||||
: " O" + " r:" + this.resource + "]");
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|||
* Composite key for outstanding scheduler requests for any schedulable entity.
|
||||
* Currently it includes {@link Priority}.
|
||||
*/
|
||||
public final class SchedulerRequestKey implements
|
||||
public class SchedulerRequestKey implements
|
||||
Comparable<SchedulerRequestKey> {
|
||||
|
||||
private final Priority priority;
|
||||
|
@ -62,8 +62,6 @@ public final class SchedulerRequestKey implements
|
|||
container.getAllocationRequestId(), null);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public SchedulerRequestKey(Priority priority, long allocationRequestId,
|
||||
ContainerId containerToUpdate) {
|
||||
this.priority = priority;
|
||||
|
|
|
@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
@ -56,13 +56,13 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
|
|
|
@ -120,6 +120,7 @@ import org.apache.hadoop.yarn.api.records.Token;
|
|||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
|
@ -152,7 +153,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.slf4j.Logger;
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit test for AMRMClientRelayer.
|
||||
*/
|
||||
public class TestAMRMClientRelayer {
|
||||
|
||||
/**
|
||||
* Mocked ApplicationMasterService in RM.
|
||||
*/
|
||||
public static class MockApplicationMasterService
|
||||
implements ApplicationMasterProtocol {
|
||||
|
||||
// Whether this mockRM will throw failover exception upon next heartbeat
|
||||
// from AM
|
||||
private boolean failover = false;
|
||||
private List<ResourceRequest> lastAsk;
|
||||
private List<ContainerId> lastRelease;
|
||||
private List<String> lastBlacklistAdditions;
|
||||
private List<String> lastBlacklistRemovals;
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
if (this.failover) {
|
||||
this.failover = false;
|
||||
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
if (this.failover) {
|
||||
this.failover = false;
|
||||
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
|
||||
}
|
||||
this.lastAsk = request.getAskList();
|
||||
this.lastRelease = request.getReleaseList();
|
||||
this.lastBlacklistAdditions =
|
||||
request.getResourceBlacklistRequest().getBlacklistAdditions();
|
||||
this.lastBlacklistRemovals =
|
||||
request.getResourceBlacklistRequest().getBlacklistRemovals();
|
||||
return AllocateResponse.newInstance(0, null, null,
|
||||
new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, 0,
|
||||
null, null);
|
||||
}
|
||||
|
||||
public void setFailoverFlag() {
|
||||
this.failover = true;
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration conf;
|
||||
private MockApplicationMasterService mockAMS;
|
||||
private AMRMClientRelayer relayer;
|
||||
|
||||
// Buffer of asks that will be sent to RM in the next AM heartbeat
|
||||
private List<ResourceRequest> asks = new ArrayList<>();
|
||||
private List<ContainerId> releases = new ArrayList<>();
|
||||
private List<String> blacklistAdditions = new ArrayList<>();
|
||||
private List<String> blacklistRemoval = new ArrayList<>();
|
||||
|
||||
@Before
|
||||
public void setup() throws YarnException, IOException {
|
||||
this.conf = new Configuration();
|
||||
|
||||
this.mockAMS = new MockApplicationMasterService();
|
||||
this.relayer = new AMRMClientRelayer(this.mockAMS);
|
||||
|
||||
this.relayer.init(conf);
|
||||
this.relayer.start();
|
||||
|
||||
this.relayer.registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest.newInstance("", 0, ""));
|
||||
|
||||
clearAllocateRequestLists();
|
||||
}
|
||||
|
||||
private void assertAsksAndReleases(int expectedAsk, int expectedRelease) {
|
||||
Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size());
|
||||
Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size());
|
||||
}
|
||||
|
||||
private void assertBlacklistAdditionsAndRemovals(int expectedAdditions,
|
||||
int expectedRemovals) {
|
||||
Assert.assertEquals(expectedAdditions,
|
||||
this.mockAMS.lastBlacklistAdditions.size());
|
||||
Assert.assertEquals(expectedRemovals,
|
||||
this.mockAMS.lastBlacklistRemovals.size());
|
||||
}
|
||||
|
||||
private AllocateRequest getAllocateRequest() {
|
||||
// Need to create a new one every time because rather than directly
|
||||
// referring the lists, the protobuf impl makes a copy of the lists
|
||||
return AllocateRequest.newInstance(0, 0, asks, releases,
|
||||
ResourceBlacklistRequest.newInstance(blacklistAdditions,
|
||||
blacklistRemoval));
|
||||
}
|
||||
|
||||
private void clearAllocateRequestLists() {
|
||||
this.asks.clear();
|
||||
this.releases.clear();
|
||||
this.blacklistAdditions.clear();
|
||||
this.blacklistRemoval.clear();
|
||||
}
|
||||
|
||||
private static ContainerId createContainerId(int id) {
|
||||
return ContainerId.newContainerId(
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
|
||||
id);
|
||||
}
|
||||
|
||||
protected ResourceRequest createResourceRequest(long id, String resource,
|
||||
int memory, int vCores, int priority, ExecutionType execType,
|
||||
int containers) {
|
||||
ResourceRequest req = Records.newRecord(ResourceRequest.class);
|
||||
req.setAllocationRequestId(id);
|
||||
req.setResourceName(resource);
|
||||
req.setCapability(Resource.newInstance(memory, vCores));
|
||||
req.setPriority(Priority.newInstance(priority));
|
||||
req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
|
||||
req.setNumContainers(containers);
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the proper handling of removal/cancel of resource requests.
|
||||
*/
|
||||
@Test
|
||||
public void testResourceRequestCleanup() throws YarnException, IOException {
|
||||
// Ask for two containers, one with location preference
|
||||
this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 1));
|
||||
this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 1));
|
||||
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 2));
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
|
||||
assertAsksAndReleases(3, 0);
|
||||
Assert.assertEquals(1, this.relayer.getRemotePendingAsks().size());
|
||||
ResourceRequestSet set =
|
||||
this.relayer.getRemotePendingAsks().values().iterator().next();
|
||||
Assert.assertEquals(3, set.getAsks().size());
|
||||
clearAllocateRequestLists();
|
||||
|
||||
// Cancel one ask
|
||||
this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 0));
|
||||
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 1));
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
|
||||
assertAsksAndReleases(2, 0);
|
||||
Assert.assertEquals(1, relayer.getRemotePendingAsks().size());
|
||||
set = this.relayer.getRemotePendingAsks().values().iterator().next();
|
||||
Assert.assertEquals(2, set.getAsks().size());
|
||||
clearAllocateRequestLists();
|
||||
|
||||
// Cancel the other ask, the pending askSet should be removed
|
||||
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 0));
|
||||
this.relayer.allocate(AllocateRequest.newInstance(0, 0, asks, null, null));
|
||||
|
||||
assertAsksAndReleases(1, 0);
|
||||
Assert.assertEquals(0, this.relayer.getRemotePendingAsks().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the full pending resend after RM fails over.
|
||||
*/
|
||||
@Test
|
||||
public void testResendRequestsOnRMRestart()
|
||||
throws YarnException, IOException {
|
||||
ContainerId c1 = createContainerId(1);
|
||||
ContainerId c2 = createContainerId(2);
|
||||
ContainerId c3 = createContainerId(3);
|
||||
|
||||
// Ask for two containers, one with location preference
|
||||
this.asks.add(createResourceRequest(0, "node1", 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 1));
|
||||
this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 1));
|
||||
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
|
||||
ExecutionType.GUARANTEED, 2));
|
||||
|
||||
this.releases.add(c1);
|
||||
this.blacklistAdditions.add("node1");
|
||||
this.blacklistRemoval.add("node0");
|
||||
|
||||
// 1. a fully loaded request
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
assertAsksAndReleases(3, 1);
|
||||
assertBlacklistAdditionsAndRemovals(1, 1);
|
||||
clearAllocateRequestLists();
|
||||
|
||||
// 2. empty request
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
assertAsksAndReleases(0, 0);
|
||||
assertBlacklistAdditionsAndRemovals(0, 0);
|
||||
clearAllocateRequestLists();
|
||||
|
||||
// Set RM restart and failover flag
|
||||
this.mockAMS.setFailoverFlag();
|
||||
|
||||
// More requests
|
||||
this.blacklistAdditions.add("node2");
|
||||
this.releases.add(c2);
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
|
||||
// verify pending requests are fully re-sent
|
||||
assertAsksAndReleases(3, 2);
|
||||
assertBlacklistAdditionsAndRemovals(2, 0);
|
||||
clearAllocateRequestLists();
|
||||
}
|
||||
|
||||
}
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|||
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
|
@ -69,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
|
@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
|
Loading…
Reference in New Issue