YARN-6776. Refactor ApplicaitonMasterService to move actual processing logic to a separate class. (asuresh)

This commit is contained in:
Arun Suresh 2017-07-10 14:34:58 -07:00
parent 09653ea098
commit 5496a34c0c
7 changed files with 754 additions and 481 deletions

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.ams;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
/**
* Interface to abstract out the the actual processing logic of the
* Application Master Service.
*/
public interface ApplicationMasterServiceProcessor {
/**
* Register AM attempt.
* @param applicationAttemptId applicationAttemptId.
* @param request Register Request.
* @return Register Response.
* @throws IOException IOException.
*/
RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException;
/**
* Allocate call.
* @param appAttemptId appAttemptId.
* @param request Allocate Request.
* @return Allocate Response.
* @throws YarnException YarnException.
*/
AllocateResponse allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request) throws YarnException;
/**
* Finish AM.
* @param applicationAttemptId applicationAttemptId.
* @param request Finish AM Request.
* @return Finish AM response.
*/
FinishApplicationMasterResponse finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request);
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.ams;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import java.util.ArrayList;
import java.util.List;
/**
* Utility methods to be used by {@link ApplicationMasterServiceProcessor}.
*/
public final class ApplicationMasterServiceUtils {
private ApplicationMasterServiceUtils() { }
/**
* Add update container errors to {@link AllocateResponse}.
* @param allocateResponse Allocate Response.
* @param updateContainerErrors Errors.
*/
public static void addToUpdateContainerErrors(
AllocateResponse allocateResponse,
List<UpdateContainerError> updateContainerErrors) {
if (!updateContainerErrors.isEmpty()) {
if (allocateResponse.getUpdateErrors() != null
&& !allocateResponse.getUpdateErrors().isEmpty()) {
updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
}
allocateResponse.setUpdateErrors(updateContainerErrors);
}
}
/**
* Add updated containers to {@link AllocateResponse}.
* @param allocateResponse Allocate Response.
* @param updateType Update Type.
* @param updatedContainers Updated Containers.
*/
public static void addToUpdatedContainers(AllocateResponse allocateResponse,
ContainerUpdateType updateType, List<Container> updatedContainers) {
if (updatedContainers != null && updatedContainers.size() > 0) {
ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
if (allocateResponse.getUpdatedContainers() != null &&
!allocateResponse.getUpdatedContainers().isEmpty()) {
containersToSet.addAll(allocateResponse.getUpdatedContainers());
}
for (Container updatedContainer : updatedContainers) {
containersToSet.add(
UpdatedContainer.newInstance(updateType, updatedContainer));
}
allocateResponse.setUpdatedContainers(containersToSet);
}
}
/**
* Add allocated containers to {@link AllocateResponse}.
* @param allocateResponse Allocate Response.
* @param allocatedContainers Allocated Containers.
*/
public static void addToAllocatedContainers(AllocateResponse allocateResponse,
List<Container> allocatedContainers) {
if (allocateResponse.getAllocatedContainers() != null
&& !allocateResponse.getAllocatedContainers().isEmpty()) {
allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
}
allocateResponse.setAllocatedContainers(allocatedContainers);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Public api for Application Master Service interceptors.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.ams;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -21,12 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -37,10 +33,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -52,30 +48,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -86,23 +63,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security
.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -122,6 +88,12 @@ public class ApplicationMasterService extends AbstractService implements
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
protected final RMContext rmContext;
private final ApplicationMasterServiceProcessor amsProcessor;
public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) {
this(ApplicationMasterService.class.getName(), rmContext, scheduler);
}
public ApplicationMasterService(String name, RMContext rmContext,
YarnScheduler scheduler) {
@ -129,11 +101,11 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
this.amsProcessor = createProcessor();
}
public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) {
this(ApplicationMasterService.class.getName(), rmContext, scheduler);
protected ApplicationMasterServiceProcessor createProcessor() {
return new DefaultAMSProcessor(rmContext, rScheduler);
}
@Override
@ -228,82 +200,22 @@ public class ApplicationMasterService extends AbstractService implements
+ appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
appID, applicationAttemptId);
this.rmContext.getRMApps()
.get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
appID, applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp app = this.rmContext.getRMApps().get(appID);
// Setting the response id to 0 to identify if the
// application master is register for the respective attemptid
lastResponse.setResponseId(0);
lock.setAllocateResponse(lastResponse);
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
"ApplicationMasterService", appID, applicationAttemptId);
// Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(rScheduler
.getMaximumResourceCapability(app.getQueue()));
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
response.setQueue(app.getQueue());
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Setting client token master key");
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
.getClientToAMTokenSecretManager()
.getMasterKey(applicationAttemptId).getEncoded()));
}
// For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens.
if (app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
List<Container> transferredContainers = rScheduler
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
NMToken token = rmContext.getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId,
container);
if (null != token) {
nmTokens.add(token);
}
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and
// that
// will be automatically retried by RMProxy in RPC layer.
if (e.getCause() instanceof UnknownHostException) {
throw (UnknownHostException) e.getCause();
}
}
}
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + appID + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
}
response.setSchedulerResourceTypes(rScheduler
.getSchedulingResourceTypes());
return response;
return this.amsProcessor.registerApplicationMaster(
amrmTokenIdentifier.getApplicationAttemptId(), request);
}
}
@ -353,15 +265,8 @@ public class ApplicationMasterService extends AbstractService implements
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
// For UnmanagedAMs, return true so they don't retry
return FinishApplicationMasterResponse.newInstance(
rmApp.getApplicationSubmissionContext().getUnmanagedAM());
return this.amsProcessor.finishApplicationMaster(
applicationAttemptId, request);
}
}
@ -441,10 +346,8 @@ public class ApplicationMasterService extends AbstractService implements
throw new InvalidApplicationMasterRequestException(message);
}
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
request, response);
AllocateResponse response = this.amsProcessor.allocate(
amrmTokenIdentifier.getApplicationAttemptId(), request);
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
@ -480,291 +383,7 @@ public class ApplicationMasterService extends AbstractService implements
response.setResponseId(lastResponse.getResponseId() + 1);
lock.setAllocateResponse(response);
return response;
}
}
protected void allocateInternal(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) ||
filteredProgress == Float.NEGATIVE_INFINITY ||
filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 ||
filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerError> updateErrors = new ArrayList<>();
ContainerUpdates containerUpdateRequests =
RMServerUtils.validateAndSplitUpdateResourceRequests(
rmContext, request, maximumCapacity, updateErrors);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
containerUpdateRequests);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
addToUpdateContainerErrors(allocateResponse, updateErrors);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
addToAllocatedContainers(allocateResponse, allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit());
addToContainerUpdates(appAttemptId, allocateResponse, allocation);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
allocateResponse.setCollectorAddr(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
}
private void addToContainerUpdates(ApplicationAttemptId appAttemptId,
AllocateResponse allocateResponse, Allocation allocation) {
// Handling increased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
// Handling promoted containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
allocation.getPromotedContainers());
// Handling demoted containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
allocation.getDemotedContainers());
addToUpdateContainerErrors(allocateResponse,
((AbstractYarnScheduler)rScheduler)
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
}
protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
List<UpdateContainerError> updateContainerErrors) {
if (!updateContainerErrors.isEmpty()) {
if (allocateResponse.getUpdateErrors() != null
&& !allocateResponse.getUpdateErrors().isEmpty()) {
updateContainerErrors = new ArrayList<>(updateContainerErrors);
updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
}
allocateResponse.setUpdateErrors(updateContainerErrors);
}
}
protected void addToUpdatedContainers(AllocateResponse allocateResponse,
ContainerUpdateType updateType, List<Container> updatedContainers) {
if (updatedContainers != null && updatedContainers.size() > 0) {
ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
if (allocateResponse.getUpdatedContainers() != null &&
!allocateResponse.getUpdatedContainers().isEmpty()) {
containersToSet.addAll(allocateResponse.getUpdatedContainers());
}
for (Container updatedContainer : updatedContainers) {
containersToSet.add(
UpdatedContainer.newInstance(updateType, updatedContainer));
}
allocateResponse.setUpdatedContainers(containersToSet);
}
}
protected void addToAllocatedContainers(AllocateResponse allocateResponse,
List<Container> allocatedContainers) {
if (allocateResponse.getAllocatedContainers() != null
&& !allocateResponse.getAllocatedContainers().isEmpty()) {
allocatedContainers = new ArrayList<>(allocatedContainers);
allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
}
allocateResponse.setAllocatedContainers(allocatedContainers);
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {

View File

@ -0,0 +1,455 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event
.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event
.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class);
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final RMContext rmContext;
private final YarnScheduler scheduler;
DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) {
this.rmContext = rmContext;
this.scheduler = scheduler;
}
public RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException {
RMApp app = getRmContext().getRMApps().get(
applicationAttemptId.getApplicationId());
LOG.info("AM registration " + applicationAttemptId);
getRmContext().getDispatcher().getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(app.getUser(),
RMAuditLogger.AuditConstants.REGISTER_AM,
"ApplicationMasterService", app.getApplicationId(),
applicationAttemptId);
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(getScheduler()
.getMaximumResourceCapability(app.getQueue()));
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
response.setQueue(app.getQueue());
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Setting client token master key");
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(
getRmContext().getClientToAMTokenSecretManager()
.getMasterKey(applicationAttemptId).getEncoded()));
}
// For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens.
if (app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
List<Container> transferredContainers = getScheduler()
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
NMToken token = getRmContext().getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId,
container);
if (null != token) {
nmTokens.add(token);
}
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and
// that
// will be automatically retried by RMProxy in RPC layer.
if (e.getCause() instanceof UnknownHostException) {
throw (UnknownHostException) e.getCause();
}
}
}
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + app.getApplicationId() + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
}
response.setSchedulerResourceTypes(getScheduler()
.getSchedulingResourceTypes());
return response;
}
public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request) throws YarnException {
handleProgress(appAttemptId, request);
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
getRmContext().getRMApps().get(appAttemptId.getApplicationId());
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = getScheduler().getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
getScheduler(), getRmContext());
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerError> updateErrors = new ArrayList<>();
ContainerUpdates containerUpdateRequests =
RMServerUtils.validateAndSplitUpdateResourceRequests(
getRmContext(), request, maximumCapacity, updateErrors);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
getScheduler().allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
containerUpdateRequests);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
ApplicationMasterServiceUtils.addToUpdateContainerErrors(
allocateResponse, updateErrors);
// update the response with the deltas of node status changes
handleNodeUpdates(app, allocateResponse);
ApplicationMasterServiceUtils.addToAllocatedContainers(
allocateResponse, allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit());
addToContainerUpdates(allocateResponse, allocation,
((AbstractYarnScheduler)getScheduler())
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes());
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(
getRmContext().getYarnConfiguration())) {
allocateResponse.setCollectorAddr(
getRmContext().getRMApps().get(appAttemptId.getApplicationId())
.getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
return allocateResponse;
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
List<RMNode> updatedNodes = new ArrayList<>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
getScheduler().getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
}
private void handleProgress(ApplicationAttemptId appAttemptId,
AllocateRequest request) {
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) ||
filteredProgress == Float.NEGATIVE_INFINITY ||
filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 ||
filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
}
public FinishApplicationMasterResponse finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request) {
RMApp app =
getRmContext().getRMApps().get(applicationAttemptId.getApplicationId());
// For UnmanagedAMs, return true so they don't retry
FinishApplicationMasterResponse response =
FinishApplicationMasterResponse.newInstance(
app.getApplicationSubmissionContext().getUnmanagedAM());
getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
return response;
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes = new ArrayList<>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
protected RMContext getRmContext() {
return rmContext;
}
protected YarnScheduler getScheduler() {
return scheduler;
}
private static void addToContainerUpdates(AllocateResponse allocateResponse,
Allocation allocation, List<UpdateContainerError> updateContainerErrors) {
// Handling increased containers
ApplicationMasterServiceUtils.addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
ApplicationMasterServiceUtils.addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
// Handling promoted containers
ApplicationMasterServiceUtils.addToUpdatedContainers(
allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
allocation.getPromotedContainers());
// Handling demoted containers
ApplicationMasterServiceUtils.addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
allocation.getDemotedContainers());
ApplicationMasterServiceUtils.addToUpdateContainerErrors(
allocateResponse, updateContainerErrors);
}
}

View File

@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -37,8 +39,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -70,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
@ -103,6 +101,84 @@ public class OpportunisticContainerAllocatorAMService
private volatile List<RemoteNode> cachedNodes;
private volatile long lastCacheUpdateTime;
class OpportunisticAMSProcessor extends DefaultAMSProcessor {
OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler
scheduler) {
super(rmContext, scheduler);
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException {
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
getScheduler()).getApplicationAttempt(applicationAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) {
OpportunisticContainerContext opCtx =
new OpportunisticContainerContext();
opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
.ContainerIdGenerator() {
@Override
public long generateContainerId() {
return appAttempt.getAppSchedulingInfo().getNewContainerId();
}
});
int tokenExpiryInterval = getConfig()
.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.
DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
opCtx.updateAllocationParams(
getScheduler().getMinimumResourceCapability(),
getScheduler().getMaximumResourceCapability(),
getScheduler().getMinimumResourceCapability(),
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
return super.registerApplicationMaster(applicationAttemptId, request);
}
@Override
public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request) throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks =
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler)rmContext.getScheduler())
.getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
AllocateResponse response = super.allocate(appAttemptId, request);
if (!oppContainers.isEmpty()) {
ApplicationMasterServiceUtils.addToAllocatedContainers(
response, oppContainers);
}
return response;
}
}
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(),
@ -160,6 +236,11 @@ public class OpportunisticContainerAllocatorAMService
this.nodeMonitor = topKSelector;
}
@Override
protected ApplicationMasterServiceProcessor createProcessor() {
return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler());
}
@Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
@ -180,80 +261,6 @@ public class OpportunisticContainerAllocatorAMService
return super.getServer(rpc, serverConf, addr, secretManager);
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
final ApplicationAttemptId appAttemptId = getAppAttemptId();
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) {
OpportunisticContainerContext opCtx = new OpportunisticContainerContext();
opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
.ContainerIdGenerator() {
@Override
public long generateContainerId() {
return appAttempt.getAppSchedulingInfo().getNewContainerId();
}
});
int tokenExpiryInterval = getConfig()
.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
opCtx.updateAllocationParams(
rmContext.getScheduler().getMinimumResourceCapability(),
rmContext.getScheduler().getMaximumResourceCapability(),
rmContext.getScheduler().getMinimumResourceCapability(),
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
return super.registerApplicationMaster(request);
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster
(FinishApplicationMasterRequest request) throws YarnException,
IOException {
return super.finishApplicationMaster(request);
}
@Override
protected void allocateInternal(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks =
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler)rmContext.getScheduler())
.getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
addToAllocatedContainers(allocateResponse, oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
super.allocateInternal(appAttemptId, request, allocateResponse);
}
@Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(

View File

@ -79,6 +79,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -652,6 +655,11 @@ public class TestOpportunisticContainerAllocatorAMService {
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return new RMContainerTokenSecretManager(conf);
}
@Override
public ResourceScheduler getScheduler() {
return new FifoScheduler();
}
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);