YARN-4205. Add a service for monitoring application life time out. Contributed by Rohith Sharma K S

(cherry picked from commit 2ae5a3a5bf5ea355370469a53eeccff0b5220081)
This commit is contained in:
Jian He 2016-09-29 22:00:31 +08:00
parent 2b6b4fd779
commit 39f896e1a1
21 changed files with 738 additions and 16 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.records;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -535,4 +536,24 @@ public abstract void setLogAggregationContext(
@Public
@Unstable
public abstract void setReservationID(ReservationId reservationID);
/**
* Get <code>ApplicationTimeouts</code> of the application. Timeout value is
* in seconds.
* @return all <code>ApplicationTimeouts</code> of the application.
*/
@Public
@Unstable
public abstract Map<ApplicationTimeoutType, Long> getApplicationTimeouts();
/**
* Set the <code>ApplicationTimeouts</code> for the application in seconds.
* All pre-existing Map entries are cleared before adding the new Map.
* @param applicationTimeouts <code>ApplicationTimeouts</code>s for the
* application
*/
@Public
@Unstable
public abstract void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> applicationTimeouts);
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Application timeout type.
*/
@Public
@Unstable
public enum ApplicationTimeoutType {
/**
* <p>
* Timeout imposed on overall application life time. It includes actual
* run-time plus non-runtime. Non-runtime delays are time elapsed by scheduler
* to allocate container, time taken to store in RMStateStore and etc.
* </p>
* If this is set, then timeout monitoring start from application submission
* time.
*/
LIFETIME;
}

View File

@ -1507,6 +1507,12 @@ public static boolean isAclEnabled(Configuration conf) {
false;
// Configurations for applicaiton life time monitor feature
public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms";
public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
60000;
/**
* Interval of time the linux container executor should try cleaning up

View File

@ -370,6 +370,16 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
repeated ApplicationTimeoutMapProto application_timeouts = 18;
}
enum ApplicationTimeoutTypeProto {
APP_TIMEOUT_LIFETIME = 1;
}
message ApplicationTimeoutMapProto {
optional ApplicationTimeoutTypeProto application_timeout_type = 1;
optional int64 timeout = 2;
}
message LogAggregationContextProto {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -26,6 +30,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
@ -36,6 +41,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -63,6 +69,7 @@ public class ApplicationSubmissionContextPBImpl
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@ -131,6 +138,9 @@ private void mergeLocalToBuilder() {
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
if (this.applicationTimeouts != null) {
addApplicationTimeouts();
}
}
private void mergeLocalToProto() {
@ -548,4 +558,77 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto();
}
@Override
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
initApplicationTimeout();
return this.applicationTimeouts;
}
private void initApplicationTimeout() {
if (this.applicationTimeouts != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationTimeoutMapProto> lists = p.getApplicationTimeoutsList();
this.applicationTimeouts =
new HashMap<ApplicationTimeoutType, Long>(lists.size());
for (ApplicationTimeoutMapProto timeoutProto : lists) {
this.applicationTimeouts.put(
ProtoUtils
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
timeoutProto.getTimeout());
}
}
@Override
public void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> appTimeouts) {
if (appTimeouts == null) {
return;
}
initApplicationTimeout();
this.applicationTimeouts.clear();
this.applicationTimeouts.putAll(appTimeouts);
}
private void addApplicationTimeouts() {
maybeInitBuilder();
builder.clearApplicationTimeouts();
if (applicationTimeouts == null) {
return;
}
Iterable<? extends ApplicationTimeoutMapProto> values =
new Iterable<ApplicationTimeoutMapProto>() {
@Override
public Iterator<ApplicationTimeoutMapProto> iterator() {
return new Iterator<ApplicationTimeoutMapProto>() {
private Iterator<ApplicationTimeoutType> iterator =
applicationTimeouts.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public ApplicationTimeoutMapProto next() {
ApplicationTimeoutType key = iterator.next();
return ApplicationTimeoutMapProto.newBuilder()
.setTimeout(applicationTimeouts.get(key))
.setApplicationTimeoutType(
ProtoUtils.convertToProtoFormat(key))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllApplicationTimeouts(values);
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
@ -51,6 +52,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
@ -259,6 +261,23 @@ public static ApplicationAccessType convertFromProtoFormat(
return ApplicationAccessType.valueOf(e.name().replace(
APP_ACCESS_TYPE_PREFIX, ""));
}
/*
* ApplicationTimeoutType
*/
private static String APP_TIMEOUT_TYPE_PREFIX = "APP_TIMEOUT_";
public static ApplicationTimeoutTypeProto convertToProtoFormat(
ApplicationTimeoutType e) {
return ApplicationTimeoutTypeProto
.valueOf(APP_TIMEOUT_TYPE_PREFIX + e.name());
}
public static ApplicationTimeoutType convertFromProtoFormat(
ApplicationTimeoutTypeProto e) {
return ApplicationTimeoutType
.valueOf(e.name().replace(APP_TIMEOUT_TYPE_PREFIX, ""));
}
/*
* Reservation Request interpreter type

View File

@ -44,8 +44,8 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
private Thread checkerThread;
private volatile boolean stopped;
public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
private int expireInterval = DEFAULT_EXPIRE;
private int monitorInterval = expireInterval/3;
private long expireInterval = DEFAULT_EXPIRE;
private long monitorInterval = expireInterval / 3;
private final Clock clock;
@ -85,7 +85,12 @@ protected void setExpireInterval(int expireInterval) {
this.expireInterval = expireInterval;
}
protected void setMonitorInterval(int monitorInterval) {
protected long getExpireInterval(O o) {
// by-default return for all the registered object interval.
return this.expireInterval;
}
protected void setMonitorInterval(long monitorInterval) {
this.monitorInterval = monitorInterval;
}
@ -97,7 +102,11 @@ public synchronized void receivedPing(O ob) {
}
public synchronized void register(O ob) {
running.put(ob, clock.getTime());
register(ob, clock.getTime());
}
public synchronized void register(O ob, long monitorStartTime) {
running.put(ob, monitorStartTime);
}
public synchronized void unregister(O ob) {
@ -117,19 +126,20 @@ private class PingChecker implements Runnable {
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) {
Iterator<Map.Entry<O, Long>> iterator =
running.entrySet().iterator();
Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();
//avoid calculating current time everytime in loop
// avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
Map.Entry<O, Long> entry = iterator.next();
if (currentTime > entry.getValue() + expireInterval) {
O key = entry.getKey();
long interval = getExpireInterval(key);
if (currentTime > entry.getValue() + interval) {
iterator.remove();
expire(entry.getKey());
LOG.info("Expired:" + entry.getKey().toString() +
" Timed out after " + expireInterval/1000 + " secs");
expire(key);
LOG.info("Expired:" + entry.getKey().toString()
+ " Timed out after " + interval / 1000 + " secs");
}
}
}

View File

@ -3005,4 +3005,13 @@
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value>
</property>
<property>
<description>
The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval
</description>
<name>yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms</name>
<value>60000</value>
</property>
</configuration>

View File

@ -368,7 +368,7 @@ private static Object genTypeValue(Type type) {
return bytes[rand.nextInt(4)];
} else if (type.equals(int.class) || type.equals(Integer.class)) {
return rand.nextInt(1000000);
} else if (type.equals(long.class)) {
} else if (type.equals(long.class) || type.equals(Long.class)) {
return Long.valueOf(rand.nextInt(1000000));
} else if (type.equals(float.class)) {
return rand.nextFloat();

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -104,6 +105,8 @@ public class RMActiveServiceContext {
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@ -453,4 +456,17 @@ public PlacementManager getQueuePlacementManager() {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
@Private
@Unstable
public void setRMAppLifetimeMonitor(
RMAppLifetimeMonitor lifetimeMonitor) {
this.rmAppLifetimeMonitor = lifetimeMonitor;
}
@Private
@Unstable
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
}

View File

@ -383,6 +383,10 @@ private RMAppImpl createAndPopulateNewRMApp(
}
}
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
// Create RMApp
RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -143,4 +144,8 @@ void setRMDelegatedNodeLabelsUpdater(
LeaderElectorService getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -486,4 +487,15 @@ public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
@Override
public void setRMAppLifetimeMonitor(
RMAppLifetimeMonitor rmAppLifetimeMonitor) {
this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
}
@Override
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -51,6 +52,7 @@
import org.apache.hadoop.yarn.exceptions
.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -470,4 +472,18 @@ public static void processRMProxyUsersConf(Configuration conf) {
conf.set(entry.getKey(), entry.getValue());
}
}
public static void validateApplicationTimeouts(
Map<ApplicationTimeoutType, Long> timeouts) throws YarnException {
if (timeouts != null) {
for (Map.Entry<ApplicationTimeoutType, Long> timeout : timeouts
.entrySet()) {
if (timeout.getValue() < 0) {
String message = "Invalid application timeout, value="
+ timeout.getValue() + " for type=" + timeout.getKey();
throw new YarnException(message);
}
}
}
}
}

View File

@ -84,6 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -519,6 +520,10 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
@ -1361,4 +1366,8 @@ private static void printUsage(PrintStream out) {
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
}
protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
return new RMAppLifetimeMonitor(this.rmContext);
}
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -494,6 +495,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
}
}
}
@Override
@ -1021,6 +1024,20 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
}
long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
if (applicationLifetime > 0) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
ApplicationTimeoutType.LIFETIME, app.submitTime,
applicationLifetime * 1000);
if (LOG.isDebugEnabled()) {
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type="
+ ApplicationTimeoutType.LIFETIME + " value="
+ applicationLifetime + " seconds");
}
}
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@ -1067,6 +1084,13 @@ private static final class FinalStateSavedTransition implements
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
Map<ApplicationTimeoutType, Long> timeouts =
app.submissionContext.getApplicationTimeouts();
if (timeouts != null && timeouts.size() > 0) {
app.rmContext.getRMAppLifetimeMonitor()
.unregisterApp(app.getApplicationId(), timeouts.keySet());
}
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
@ -1075,7 +1099,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
app.eventCausingFinalSaving);
}
return app.targetedFinalState;
}
}
@ -1124,6 +1147,18 @@ private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
if (applicationLifetime > 0) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
ApplicationTimeoutType.LIFETIME, app.submitTime,
applicationLifetime * 1000);
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type="
+ ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
+ " seconds");
}
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
@ -1837,4 +1872,14 @@ private void sendATSCreateEvent() {
public int getNextAttemptId() {
return nextAttemptId;
}
private long getApplicationLifetime(ApplicationTimeoutType type) {
Map<ApplicationTimeoutType, Long> timeouts =
this.submissionContext.getApplicationTimeouts();
long applicationLifetime = -1;
if (timeouts != null && timeouts.containsKey(type)) {
applicationLifetime = timeouts.get(type);
}
return applicationLifetime;
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* This service will monitor the applications against the lifetime value given.
* The applications will be killed if it running beyond the given time.
*/
public class RMAppLifetimeMonitor
extends AbstractLivelinessMonitor<RMAppToMonitor> {
private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
private RMContext rmContext;
private Map<RMAppToMonitor, Long> monitoredApps =
new HashMap<RMAppToMonitor, Long>();
private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
public RMAppLifetimeMonitor(RMContext rmContext) {
super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
long monitorInterval = conf.getLong(
YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
if (monitorInterval <= 0) {
monitorInterval =
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
}
setMonitorInterval(monitorInterval);
LOG.info("Application lifelime monitor interval set to " + monitorInterval
+ " ms.");
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
@Override
protected synchronized void expire(RMAppToMonitor monitoredAppKey) {
Long remove = monitoredApps.remove(monitoredAppKey);
ApplicationId appId = monitoredAppKey.getApplicationId();
RMApp app = rmContext.getRMApps().get(appId);
if (app == null) {
return;
}
// Don't trigger a KILL event if application is in completed states
if (!COMPLETED_APP_STATES.contains(app.getState())) {
String diagnostics =
"Application killed due to exceeding its lifetime period " + remove
+ " milliseconds";
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
} else {
LOG.info("Application " + appId
+ " is about to complete. So not killing the application.");
}
}
public synchronized void registerApp(ApplicationId appId,
ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) {
RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType);
register(appToMonitor, monitorStartTime);
monitoredApps.putIfAbsent(appToMonitor, timeout);
}
@Override
protected synchronized long getExpireInterval(
RMAppToMonitor monitoredAppKey) {
return monitoredApps.get(monitoredAppKey);
}
public synchronized void unregisterApp(ApplicationId appId,
ApplicationTimeoutType timeoutType) {
RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType);
unregister(appToRemove);
monitoredApps.remove(appToRemove);
}
public synchronized void unregisterApp(ApplicationId appId,
Set<ApplicationTimeoutType> types) {
for (ApplicationTimeoutType type : types) {
unregisterApp(appId, type);
}
}
public synchronized void updateApplicationTimeouts(ApplicationId appId,
Map<ApplicationTimeoutType, Long> timeouts) {
// TODO in YARN-5611
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
/**
* This class used for monitor application with applicationId+appTimeoutType.
*/
public class RMAppToMonitor {
private ApplicationId applicationId;
private ApplicationTimeoutType appTimeoutType;
RMAppToMonitor(ApplicationId appId, ApplicationTimeoutType timeoutType) {
this.applicationId = appId;
this.appTimeoutType = timeoutType;
}
public ApplicationId getApplicationId() {
return applicationId;
}
public ApplicationTimeoutType getAppTimeoutType() {
return appTimeoutType;
}
@Override
public int hashCode() {
return applicationId.hashCode() + appTimeoutType.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
RMAppToMonitor other = (RMAppToMonitor) obj;
if (!this.applicationId.equals(other.getApplicationId())) {
return false;
}
if (this.appTimeoutType != other.getAppTimeoutType()) {
return false;
}
return true;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(applicationId.toString()).append("_").append(appTimeoutType);
return sb.toString();
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor contains
* classes related to application monitor.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -460,7 +461,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, null, true, priority, amLabel);
false, null, 0, null, true, priority, amLabel, null);
}
public RMApp submitApp(Resource resource, String name, String user,
@ -561,7 +562,7 @@ public RMApp submitApp(Resource capability, String name, String user,
return submitApp(capability, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
logAggregationContext, cancelTokensWhenComplete, priority, "");
logAggregationContext, cancelTokensWhenComplete, priority, "", null);
}
public RMApp submitApp(Resource capability, String name, String user,
@ -570,7 +571,8 @@ public RMApp submitApp(Resource capability, String name, String user,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel)
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
Map<ApplicationTimeoutType, Long> applicationTimeouts)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@ -587,6 +589,9 @@ public RMApp submitApp(Resource capability, String name, String user,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
sub.setApplicationTimeouts(applicationTimeouts);
}
if (unmanaged) {
sub.setUnmanagedAM(true);
}
@ -1073,4 +1078,15 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId,
!apps.containsKey(appId));
LOG.info("app is removed from scheduler, " + appId);
}
public RMApp submitApp(int masterMemory, Priority priority,
Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
return submitApp(
resource, "", UserGroupInformation.getCurrentUser().getShortUserName(),
null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
false, false, null, 0, null, true, priority, null, applicationTimeouts);
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for application life time monitor feature test.
*/
public class TestApplicationLifetimeMonitor {
private YarnConfiguration conf;
@Before
public void setup() throws IOException {
conf = new YarnConfiguration();
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
UserGroupInformation.setConfiguration(conf);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
3000L);
}
@Test(timeout = 90000)
public void testApplicationLifetimeMonitor() throws Exception {
MockRM rm = null;
try {
rm = new MockRM(conf);
rm.start();
Priority appPriority = Priority.newInstance(0);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024);
Map<ApplicationTimeoutType, Long> timeouts =
new HashMap<ApplicationTimeoutType, Long>();
timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
nm1.nodeHeartbeat(true);
// Send launch Event
MockAM am1 =
rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am1.registerAppAttempt();
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertTrue("Applicaiton killed before lifetime value",
(System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
} finally {
stopRM(rm);
}
}
@SuppressWarnings("rawtypes")
@Test(timeout = 180000)
public void testApplicationLifetimeOnRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true);
long appLifetime = 60L;
Map<ApplicationTimeoutType, Long> timeouts =
new HashMap<ApplicationTimeoutType, Long>();
timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Re-start RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(amContainer, runningContainer), null);
// Wait for RM to settle down on recovering containers;
TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2,
am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
// check RMContainers are re-recreated and the container state is correct.
rm2.waitForState(nm1, amContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForState(nm1, runningContainer.getContainerId(),
RMContainerState.RUNNING);
// re register attempt to rm2
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.registerAppAttempt();
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING);
// wait for app life time and application to be in killed state.
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
Assert.assertTrue("Applicaiton killed before lifetime value",
(System.currentTimeMillis()
- recoveredApp1.getSubmitTime()) > appLifetime);
}
private void stopRM(MockRM rm) {
if (rm != null) {
rm.stop();
}
}
}