YARN-6777. Support for ApplicationMasterService processing chain of interceptors. (asuresh)

This commit is contained in:
Arun Suresh 2017-07-17 17:02:22 -07:00
parent 3556e36be3
commit 077fcf6a96
10 changed files with 446 additions and 81 deletions

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.ams;
/**
* This is a marker interface for a context object that is injected into
* the ApplicationMasterService processor. The processor implementation
* is free to type cast this based on the availability of the context's
* implementation in the classpath.
*/
public interface ApplicationMasterServiceContext {
}

View File

@ -37,35 +37,45 @@
*/ */
public interface ApplicationMasterServiceProcessor { public interface ApplicationMasterServiceProcessor {
/**
* Initialize with and ApplicationMasterService Context as well as the
* next processor in the chain.
* @param amsContext AMSContext.
* @param nextProcessor next ApplicationMasterServiceProcessor
*/
void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor);
/** /**
* Register AM attempt. * Register AM attempt.
* @param applicationAttemptId applicationAttemptId. * @param applicationAttemptId applicationAttemptId.
* @param request Register Request. * @param request Register Request.
* @return Register Response. * @param response Register Response.
* @throws IOException IOException. * @throws IOException IOException.
*/ */
RegisterApplicationMasterResponse registerApplicationMaster( void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException; RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response) throws IOException;
/** /**
* Allocate call. * Allocate call.
* @param appAttemptId appAttemptId. * @param appAttemptId appAttemptId.
* @param request Allocate Request. * @param request Allocate Request.
* @return Allocate Response. * @param response Allocate Response.
* @throws YarnException YarnException. * @throws YarnException YarnException.
*/ */
AllocateResponse allocate(ApplicationAttemptId appAttemptId, void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request) throws YarnException; AllocateRequest request, AllocateResponse response) throws YarnException;
/** /**
* Finish AM. * Finish AM.
* @param applicationAttemptId applicationAttemptId. * @param applicationAttemptId applicationAttemptId.
* @param request Finish AM Request. * @param request Finish AM Request.
* @return Finish AM response. * @param response Finish AM Response.
*/ */
FinishApplicationMasterResponse finishApplicationMaster( void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request); FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response);
} }

View File

@ -103,7 +103,7 @@ private static void addDeprecatedKeys() {
YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled"; YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled";
public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false; public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false;
//////////////////////////////// ////////////////////////////////
// IPC Configs // IPC Configs
//////////////////////////////// ////////////////////////////////
@ -150,6 +150,9 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_RM_ADDRESS = public static final String DEFAULT_RM_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT; "0.0.0.0:" + DEFAULT_RM_PORT;
public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
RM_PREFIX + "application-master-service.processors";
/** The actual bind address for the RM.*/ /** The actual bind address for the RM.*/
public static final String RM_BIND_HOST = public static final String RM_BIND_HOST =
RM_PREFIX + "bind-host"; RM_PREFIX + "bind-host";

View File

@ -122,6 +122,16 @@
<value>50</value> <value>50</value>
</property> </property>
<property>
<description>
Comma separated class names of ApplicationMasterServiceProcessor
implementations. The processors will be applied in the order
they are specified.
</description>
<name>yarn.resourcemanager.application-master-service.processors</name>
<value></value>
</property>
<property> <property>
<description> <description>
This configures the HTTP endpoint for Yarn Daemons.The following This configures the HTTP endpoint for Yarn Daemons.The following

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.io.IOException;
/**
* This maintains a chain of {@link ApplicationMasterServiceProcessor}s.
*/
class AMSProcessingChain implements ApplicationMasterServiceProcessor {
private static final Log LOG = LogFactory.getLog(AMSProcessingChain.class);
private ApplicationMasterServiceProcessor head;
private RMContext rmContext;
/**
* This has to be initialized with at-least 1 Processor.
* @param rootProcessor Root processor.
*/
AMSProcessingChain(ApplicationMasterServiceProcessor rootProcessor) {
if (rootProcessor == null) {
throw new YarnRuntimeException("No root ApplicationMasterService" +
"Processor specified for the processing chain..");
}
this.head = rootProcessor;
}
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor) {
LOG.info("Initializing AMS Processing chain. Root Processor=["
+ this.head.getClass().getName() + "].");
this.rmContext = (RMContext)amsContext;
// The head is initialized with a null 'next' processor
this.head.init(amsContext, null);
}
/**
* Add an processor to the top of the chain.
* @param processor ApplicationMasterServiceProcessor
*/
public synchronized void addProcessor(
ApplicationMasterServiceProcessor processor) {
LOG.info("Adding [" + processor.getClass().getName() + "] tp top of" +
" AMS Processing chain. ");
processor.init(this.rmContext, this.head);
this.head = processor;
}
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse resp) throws IOException {
this.head.registerApplicationMaster(applicationAttemptId, request, resp);
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
this.head.allocate(appAttemptId, request, response);
}
@Override
public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
this.head.finishApplicationMaster(applicationAttemptId, request, response);
}
}

View File

@ -22,6 +22,7 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -88,7 +89,7 @@ public class ApplicationMasterService extends AbstractService implements
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
protected final RMContext rmContext; protected final RMContext rmContext;
private final ApplicationMasterServiceProcessor amsProcessor; private final AMSProcessingChain amsProcessingChain;
public ApplicationMasterService(RMContext rmContext, public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
@ -101,11 +102,7 @@ public ApplicationMasterService(String name, RMContext rmContext,
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler; this.rScheduler = scheduler;
this.rmContext = rmContext; this.rmContext = rmContext;
this.amsProcessor = createProcessor(); this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor());
}
protected ApplicationMasterServiceProcessor createProcessor() {
return new DefaultAMSProcessor(rmContext, rScheduler);
} }
@Override @Override
@ -115,6 +112,21 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
amsProcessingChain.init(rmContext, null);
List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
if (processors != null) {
Collections.reverse(processors);
for (ApplicationMasterServiceProcessor p : processors) {
this.amsProcessingChain.addProcessor(p);
}
}
}
protected List<ApplicationMasterServiceProcessor> getProcessorList(
Configuration conf) {
return conf.getInstances(
YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
ApplicationMasterServiceProcessor.class);
} }
@Override @Override
@ -165,6 +177,10 @@ protected Server getServer(YarnRPC rpc, Configuration serverConf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
} }
protected AMSProcessingChain getProcessingChain() {
return this.amsProcessingChain;
}
@Private @Private
public InetSocketAddress getBindAddress() { public InetSocketAddress getBindAddress() {
return this.masterServiceAddress; return this.masterServiceAddress;
@ -214,8 +230,12 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
lastResponse.setResponseId(0); lastResponse.setResponseId(0);
lock.setAllocateResponse(lastResponse); lock.setAllocateResponse(lastResponse);
return this.amsProcessor.registerApplicationMaster( RegisterApplicationMasterResponse response =
amrmTokenIdentifier.getApplicationAttemptId(), request); recordFactory.newRecordInstance(
RegisterApplicationMasterResponse.class);
this.amsProcessingChain.registerApplicationMaster(
amrmTokenIdentifier.getApplicationAttemptId(), request, response);
return response;
} }
} }
@ -265,8 +285,11 @@ public FinishApplicationMasterResponse finishApplicationMaster(
} }
this.amLivelinessMonitor.receivedPing(applicationAttemptId); this.amLivelinessMonitor.receivedPing(applicationAttemptId);
return this.amsProcessor.finishApplicationMaster( FinishApplicationMasterResponse response =
applicationAttemptId, request); FinishApplicationMasterResponse.newInstance(false);
this.amsProcessingChain.finishApplicationMaster(
applicationAttemptId, request, response);
return response;
} }
} }
@ -346,8 +369,10 @@ public AllocateResponse allocate(AllocateRequest request)
throw new InvalidApplicationMasterRequestException(message); throw new InvalidApplicationMasterRequestException(message);
} }
AllocateResponse response = this.amsProcessor.allocate( AllocateResponse response =
amrmTokenIdentifier.getApplicationAttemptId(), request); recordFactory.newRecordInstance(AllocateResponse.class);
this.amsProcessingChain.allocate(
amrmTokenIdentifier.getApplicationAttemptId(), request, response);
// update AMRMToken if the token is rolled-up // update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey = MasterKeyData nextMasterKey =

View File

@ -21,6 +21,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -81,7 +82,11 @@
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { /**
* This is the default Application Master Service processor. It has be the
* last processor in the @{@link AMSProcessingChain}.
*/
final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class); private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class);
@ -93,17 +98,19 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private final RMContext rmContext; private RMContext rmContext;
private final YarnScheduler scheduler;
DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) { @Override
this.rmContext = rmContext; public void init(ApplicationMasterServiceContext amsContext,
this.scheduler = scheduler; ApplicationMasterServiceProcessor nextProcessor) {
this.rmContext = (RMContext)amsContext;
} }
public RegisterApplicationMasterResponse registerApplicationMaster( @Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException { RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response) throws IOException {
RMApp app = getRmContext().getRMApps().get( RMApp app = getRmContext().getRMApps().get(
applicationAttemptId.getApplicationId()); applicationAttemptId.getApplicationId());
@ -116,8 +123,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
RMAuditLogger.AuditConstants.REGISTER_AM, RMAuditLogger.AuditConstants.REGISTER_AM,
"ApplicationMasterService", app.getApplicationId(), "ApplicationMasterService", app.getApplicationId(),
applicationAttemptId); applicationAttemptId);
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(getScheduler() response.setMaximumResourceCapability(getScheduler()
.getMaximumResourceCapability(app.getQueue())); .getMaximumResourceCapability(app.getQueue()));
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
@ -165,11 +170,11 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
response.setSchedulerResourceTypes(getScheduler() response.setSchedulerResourceTypes(getScheduler()
.getSchedulingResourceTypes()); .getSchedulingResourceTypes());
return response;
} }
public AllocateResponse allocate(ApplicationAttemptId appAttemptId, @Override
AllocateRequest request) throws YarnException { public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
handleProgress(appAttemptId, request); handleProgress(appAttemptId, request);
@ -259,50 +264,46 @@ public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
"blacklistRemovals: " + blacklistRemovals); "blacklistRemovals: " + blacklistRemovals);
} }
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (allocation.getNMTokens() != null && if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) { !allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens()); response.setNMTokens(allocation.getNMTokens());
} }
// Notify the AM of container update errors // Notify the AM of container update errors
ApplicationMasterServiceUtils.addToUpdateContainerErrors( ApplicationMasterServiceUtils.addToUpdateContainerErrors(
allocateResponse, updateErrors); response, updateErrors);
// update the response with the deltas of node status changes // update the response with the deltas of node status changes
handleNodeUpdates(app, allocateResponse); handleNodeUpdates(app, response);
ApplicationMasterServiceUtils.addToAllocatedContainers( ApplicationMasterServiceUtils.addToAllocatedContainers(
allocateResponse, allocation.getContainers()); response, allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers()); .pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit()); response.setAvailableResources(allocation.getResourceLimit());
addToContainerUpdates(allocateResponse, allocation, addToContainerUpdates(response, allocation,
((AbstractYarnScheduler)getScheduler()) ((AbstractYarnScheduler)getScheduler())
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes()); response.setNumClusterNodes(getScheduler().getNumClusterNodes());
// add collector address for this application // add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled( if (YarnConfiguration.timelineServiceV2Enabled(
getRmContext().getYarnConfiguration())) { getRmContext().getYarnConfiguration())) {
allocateResponse.setCollectorAddr( response.setCollectorAddr(
getRmContext().getRMApps().get(appAttemptId.getApplicationId()) getRmContext().getRMApps().get(appAttemptId.getApplicationId())
.getCollectorAddr()); .getCollectorAddr());
} }
// add preemption to the allocateResponse message (if any) // add preemption to the allocateResponse message (if any)
allocateResponse response.setPreemptionMessage(generatePreemptionMessage(allocation));
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority // Set application priority
allocateResponse.setApplicationPriority(app response.setApplicationPriority(app
.getApplicationPriority()); .getApplicationPriority());
return allocateResponse;
} }
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
@ -351,20 +352,20 @@ private void handleProgress(ApplicationAttemptId appAttemptId,
.getProgress())); .getProgress()));
} }
public FinishApplicationMasterResponse finishApplicationMaster( @Override
public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request) { FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
RMApp app = RMApp app =
getRmContext().getRMApps().get(applicationAttemptId.getApplicationId()); getRmContext().getRMApps().get(applicationAttemptId.getApplicationId());
// For UnmanagedAMs, return true so they don't retry // For UnmanagedAMs, return true so they don't retry
FinishApplicationMasterResponse response = response.setIsUnregistered(
FinishApplicationMasterResponse.newInstance(
app.getApplicationSubmissionContext().getUnmanagedAM()); app.getApplicationSubmissionContext().getUnmanagedAM());
getRmContext().getDispatcher().getEventHandler().handle( getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request .getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics())); .getDiagnostics()));
return response;
} }
private PreemptionMessage generatePreemptionMessage(Allocation allocation){ private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@ -424,7 +425,7 @@ protected RMContext getRmContext() {
} }
protected YarnScheduler getScheduler() { protected YarnScheduler getScheduler() {
return scheduler; return rmContext.getScheduler();
} }
private static void addToContainerUpdates(AllocateResponse allocateResponse, private static void addToContainerUpdates(AllocateResponse allocateResponse,

View File

@ -23,9 +23,13 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -101,17 +105,29 @@ public class OpportunisticContainerAllocatorAMService
private volatile List<RemoteNode> cachedNodes; private volatile List<RemoteNode> cachedNodes;
private volatile long lastCacheUpdateTime; private volatile long lastCacheUpdateTime;
class OpportunisticAMSProcessor extends DefaultAMSProcessor { class OpportunisticAMSProcessor implements
ApplicationMasterServiceProcessor {
OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler private ApplicationMasterServiceContext context;
scheduler) { private ApplicationMasterServiceProcessor nextProcessor;
super(rmContext, scheduler);
private YarnScheduler getScheduler() {
return ((RMContext)context).getScheduler();
} }
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor next) {
this.context = amsContext;
// The AMSProcessingChain guarantees that 'next' is not null.
this.nextProcessor = next;
}
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request) throws IOException { RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response) throws IOException {
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
getScheduler()).getApplicationAttempt(applicationAttemptId); getScheduler()).getApplicationAttempt(applicationAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) { if (appAttempt.getOpportunisticContainerContext() == null) {
@ -135,12 +151,14 @@ public long generateContainerId() {
tokenExpiryInterval); tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx); appAttempt.setOpportunisticContainerContext(opCtx);
} }
return super.registerApplicationMaster(applicationAttemptId, request); nextProcessor.registerApplicationMaster(
applicationAttemptId, request, response);
} }
@Override @Override
public AllocateResponse allocate(ApplicationAttemptId appAttemptId, public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request) throws YarnException { AllocateRequest request, AllocateResponse response)
throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC. // Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks = partitionedAsks =
@ -165,17 +183,22 @@ public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
if (!oppContainers.isEmpty()) { if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false); handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers); appAttempt.updateNMTokens(oppContainers);
ApplicationMasterServiceUtils.addToAllocatedContainers(
response, oppContainers);
} }
// Allocate GUARANTEED containers. // Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed()); request.setAskList(partitionedAsks.getGuaranteed());
nextProcessor.allocate(appAttemptId, request, response);
}
AllocateResponse response = super.allocate(appAttemptId, request); @Override
if (!oppContainers.isEmpty()) { public void finishApplicationMaster(
ApplicationMasterServiceUtils.addToAllocatedContainers( ApplicationAttemptId applicationAttemptId,
response, oppContainers); FinishApplicationMasterRequest request,
} FinishApplicationMasterResponse response) {
return response; nextProcessor.finishApplicationMaster(applicationAttemptId,
request, response);
} }
} }
@ -236,11 +259,6 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext,
this.nodeMonitor = topKSelector; this.nodeMonitor = topKSelector;
} }
@Override
protected ApplicationMasterServiceProcessor createProcessor() {
return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler());
}
@Override @Override
public Server getServer(YarnRPC rpc, Configuration serverConf, public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) { InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
@ -261,6 +279,15 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
return super.getServer(rpc, serverConf, addr, secretManager); return super.getServer(rpc, serverConf, addr, secretManager);
} }
@Override
protected List<ApplicationMasterServiceProcessor> getProcessorList(
Configuration conf) {
List<ApplicationMasterServiceProcessor> retVal =
super.getProcessorList(conf);
retVal.add(new OpportunisticAMSProcessor());
return retVal;
}
@Override @Override
public RegisterDistributedSchedulingAMResponse public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling( registerApplicationMasterForDistributedScheduling(

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProvider;
@ -53,7 +54,7 @@
/** /**
* Context of the ResourceManager. * Context of the ResourceManager.
*/ */
public interface RMContext { public interface RMContext extends ApplicationMasterServiceContext {
Dispatcher getDispatcher(); Dispatcher getDispatcher();

View File

@ -20,20 +20,29 @@
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
@ -44,6 +53,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -61,7 +71,7 @@
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestApplicationMasterService { public class TestApplicationMasterService {
@ -71,13 +81,160 @@ public class TestApplicationMasterService {
private final int GB = 1024; private final int GB = 1024;
private static YarnConfiguration conf; private static YarnConfiguration conf;
@BeforeClass private static AtomicInteger beforeRegCount = new AtomicInteger(0);
public static void setup() { private static AtomicInteger afterRegCount = new AtomicInteger(0);
private static AtomicInteger beforeAllocCount = new AtomicInteger(0);
private static AtomicInteger afterAllocCount = new AtomicInteger(0);
private static AtomicInteger beforeFinishCount = new AtomicInteger(0);
private static AtomicInteger afterFinishCount = new AtomicInteger(0);
private static AtomicInteger initCount = new AtomicInteger(0);
static class TestInterceptor1 implements
ApplicationMasterServiceProcessor {
private ApplicationMasterServiceProcessor nextProcessor;
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor next) {
initCount.incrementAndGet();
this.nextProcessor = next;
}
@Override
public void registerApplicationMaster(ApplicationAttemptId
applicationAttemptId, RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response) throws IOException {
nextProcessor.registerApplicationMaster(
applicationAttemptId, request, response);
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request,
AllocateResponse response) throws YarnException {
beforeAllocCount.incrementAndGet();
nextProcessor.allocate(appAttemptId, request, response);
afterAllocCount.incrementAndGet();
}
@Override
public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
beforeFinishCount.incrementAndGet();
afterFinishCount.incrementAndGet();
}
}
static class TestInterceptor2 implements
ApplicationMasterServiceProcessor {
private ApplicationMasterServiceProcessor nextProcessor;
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor next) {
initCount.incrementAndGet();
this.nextProcessor = next;
}
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response) throws IOException {
beforeRegCount.incrementAndGet();
nextProcessor.registerApplicationMaster(applicationAttemptId,
request, response);
afterRegCount.incrementAndGet();
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response)
throws YarnException {
beforeAllocCount.incrementAndGet();
nextProcessor.allocate(appAttemptId, request, response);
afterAllocCount.incrementAndGet();
}
@Override
public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
beforeFinishCount.incrementAndGet();
nextProcessor.finishApplicationMaster(
applicationAttemptId, request, response);
afterFinishCount.incrementAndGet();
}
}
@Before
public void setup() {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class); ResourceScheduler.class);
} }
@Test(timeout = 300000)
public void testApplicationMasterInterceptor() throws Exception {
conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
TestInterceptor1.class.getName() + ","
+ TestInterceptor2.class.getName());
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(2048);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
int allocCount = 0;
am1.addRequests(new String[] {"127.0.0.1"}, GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
allocCount++;
// kick the scheduler
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
alloc1Response = am1.schedule();
allocCount++;
}
// assert RMIdentifer is set properly in allocated containers
Container allocatedContainer =
alloc1Response.getAllocatedContainers().get(0);
ContainerTokenIdentifier tokenId =
BuilderUtils.newContainerTokenIdentifier(allocatedContainer
.getContainerToken());
am1.unregisterAppAttempt();
Assert.assertEquals(1, beforeRegCount.get());
Assert.assertEquals(1, afterRegCount.get());
// The allocate calls should be incremented twice
Assert.assertEquals(allocCount * 2, beforeAllocCount.get());
Assert.assertEquals(allocCount * 2, afterAllocCount.get());
// Finish should only be called once, since the FirstInterceptor
// does not forward the call.
Assert.assertEquals(1, beforeFinishCount.get());
Assert.assertEquals(1, afterFinishCount.get());
rm.stop();
}
@Test(timeout = 3000000) @Test(timeout = 3000000)
public void testRMIdentifierOnContainerAllocation() throws Exception { public void testRMIdentifierOnContainerAllocation() throws Exception {
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);