YARN-4205. Add a service for monitoring application life time out. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2016-09-29 22:00:31 +08:00
parent 1518cb9532
commit 2ae5a3a5bf
21 changed files with 738 additions and 16 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -535,4 +536,24 @@ public abstract class ApplicationSubmissionContext {
@Public @Public
@Unstable @Unstable
public abstract void setReservationID(ReservationId reservationID); public abstract void setReservationID(ReservationId reservationID);
/**
* Get <code>ApplicationTimeouts</code> of the application. Timeout value is
* in seconds.
* @return all <code>ApplicationTimeouts</code> of the application.
*/
@Public
@Unstable
public abstract Map<ApplicationTimeoutType, Long> getApplicationTimeouts();
/**
* Set the <code>ApplicationTimeouts</code> for the application in seconds.
* All pre-existing Map entries are cleared before adding the new Map.
* @param applicationTimeouts <code>ApplicationTimeouts</code>s for the
* application
*/
@Public
@Unstable
public abstract void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> applicationTimeouts);
} }

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Application timeout type.
*/
@Public
@Unstable
public enum ApplicationTimeoutType {
/**
* <p>
* Timeout imposed on overall application life time. It includes actual
* run-time plus non-runtime. Non-runtime delays are time elapsed by scheduler
* to allocate container, time taken to store in RMStateStore and etc.
* </p>
* If this is set, then timeout monitoring start from application submission
* time.
*/
LIFETIME;
}

View File

@ -1533,6 +1533,12 @@ public class YarnConfiguration extends Configuration {
false; false;
// Configurations for applicaiton life time monitor feature
public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms";
public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
60000;
/** /**
* Interval of time the linux container executor should try cleaning up * Interval of time the linux container executor should try cleaning up

View File

@ -365,6 +365,16 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15; optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16; optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17; optional ResourceRequestProto am_container_resource_request = 17;
repeated ApplicationTimeoutMapProto application_timeouts = 18;
}
enum ApplicationTimeoutTypeProto {
APP_TIMEOUT_LIFETIME = 1;
}
message ApplicationTimeoutMapProto {
optional ApplicationTimeoutTypeProto application_timeout_type = 1;
optional int64 timeout = 2;
} }
message LogAggregationContextProto { message LogAggregationContextProto {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -26,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -63,6 +69,7 @@ extends ApplicationSubmissionContext {
private ResourceRequest amResourceRequest = null; private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null; private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null; private ReservationId reservationId = null;
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
public ApplicationSubmissionContextPBImpl() { public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder(); builder = ApplicationSubmissionContextProto.newBuilder();
@ -131,6 +138,9 @@ extends ApplicationSubmissionContext {
if (this.reservationId != null) { if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId)); builder.setReservationId(convertToProtoFormat(this.reservationId));
} }
if (this.applicationTimeouts != null) {
addApplicationTimeouts();
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -548,4 +558,77 @@ extends ApplicationSubmissionContext {
private ReservationIdProto convertToProtoFormat(ReservationId t) { private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto(); return ((ReservationIdPBImpl) t).getProto();
} }
@Override
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
initApplicationTimeout();
return this.applicationTimeouts;
}
private void initApplicationTimeout() {
if (this.applicationTimeouts != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationTimeoutMapProto> lists = p.getApplicationTimeoutsList();
this.applicationTimeouts =
new HashMap<ApplicationTimeoutType, Long>(lists.size());
for (ApplicationTimeoutMapProto timeoutProto : lists) {
this.applicationTimeouts.put(
ProtoUtils
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
timeoutProto.getTimeout());
}
}
@Override
public void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> appTimeouts) {
if (appTimeouts == null) {
return;
}
initApplicationTimeout();
this.applicationTimeouts.clear();
this.applicationTimeouts.putAll(appTimeouts);
}
private void addApplicationTimeouts() {
maybeInitBuilder();
builder.clearApplicationTimeouts();
if (applicationTimeouts == null) {
return;
}
Iterable<? extends ApplicationTimeoutMapProto> values =
new Iterable<ApplicationTimeoutMapProto>() {
@Override
public Iterator<ApplicationTimeoutMapProto> iterator() {
return new Iterator<ApplicationTimeoutMapProto>() {
private Iterator<ApplicationTimeoutType> iterator =
applicationTimeouts.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public ApplicationTimeoutMapProto next() {
ApplicationTimeoutType key = iterator.next();
return ApplicationTimeoutMapProto.newBuilder()
.setTimeout(applicationTimeouts.get(key))
.setApplicationTimeoutType(
ProtoUtils.convertToProtoFormat(key))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllApplicationTimeouts(values);
}
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
@ -260,6 +262,23 @@ public class ProtoUtils {
APP_ACCESS_TYPE_PREFIX, "")); APP_ACCESS_TYPE_PREFIX, ""));
} }
/*
* ApplicationTimeoutType
*/
private static String APP_TIMEOUT_TYPE_PREFIX = "APP_TIMEOUT_";
public static ApplicationTimeoutTypeProto convertToProtoFormat(
ApplicationTimeoutType e) {
return ApplicationTimeoutTypeProto
.valueOf(APP_TIMEOUT_TYPE_PREFIX + e.name());
}
public static ApplicationTimeoutType convertFromProtoFormat(
ApplicationTimeoutTypeProto e) {
return ApplicationTimeoutType
.valueOf(e.name().replace(APP_TIMEOUT_TYPE_PREFIX, ""));
}
/* /*
* Reservation Request interpreter type * Reservation Request interpreter type
*/ */

View File

@ -44,8 +44,8 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
private Thread checkerThread; private Thread checkerThread;
private volatile boolean stopped; private volatile boolean stopped;
public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
private int expireInterval = DEFAULT_EXPIRE; private long expireInterval = DEFAULT_EXPIRE;
private int monitorInterval = expireInterval/3; private long monitorInterval = expireInterval / 3;
private final Clock clock; private final Clock clock;
@ -85,7 +85,12 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
this.expireInterval = expireInterval; this.expireInterval = expireInterval;
} }
protected void setMonitorInterval(int monitorInterval) { protected long getExpireInterval(O o) {
// by-default return for all the registered object interval.
return this.expireInterval;
}
protected void setMonitorInterval(long monitorInterval) {
this.monitorInterval = monitorInterval; this.monitorInterval = monitorInterval;
} }
@ -97,7 +102,11 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
} }
public synchronized void register(O ob) { public synchronized void register(O ob) {
running.put(ob, clock.getTime()); register(ob, clock.getTime());
}
public synchronized void register(O ob, long monitorStartTime) {
running.put(ob, monitorStartTime);
} }
public synchronized void unregister(O ob) { public synchronized void unregister(O ob) {
@ -117,19 +126,20 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) { synchronized (AbstractLivelinessMonitor.this) {
Iterator<Map.Entry<O, Long>> iterator = Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();
running.entrySet().iterator();
//avoid calculating current time everytime in loop // avoid calculating current time everytime in loop
long currentTime = clock.getTime(); long currentTime = clock.getTime();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<O, Long> entry = iterator.next(); Map.Entry<O, Long> entry = iterator.next();
if (currentTime > entry.getValue() + expireInterval) { O key = entry.getKey();
long interval = getExpireInterval(key);
if (currentTime > entry.getValue() + interval) {
iterator.remove(); iterator.remove();
expire(entry.getKey()); expire(key);
LOG.info("Expired:" + entry.getKey().toString() + LOG.info("Expired:" + entry.getKey().toString()
" Timed out after " + expireInterval/1000 + " secs"); + " Timed out after " + interval / 1000 + " secs");
} }
} }
} }

View File

@ -3075,4 +3075,13 @@
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name> <name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value> <value>60000</value>
</property> </property>
<property>
<description>
The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval
</description>
<name>yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms</name>
<value>60000</value>
</property>
</configuration> </configuration>

View File

@ -368,7 +368,7 @@ public class TestPBImplRecords {
return bytes[rand.nextInt(4)]; return bytes[rand.nextInt(4)];
} else if (type.equals(int.class) || type.equals(Integer.class)) { } else if (type.equals(int.class) || type.equals(Integer.class)) {
return rand.nextInt(1000000); return rand.nextInt(1000000);
} else if (type.equals(long.class)) { } else if (type.equals(long.class) || type.equals(Long.class)) {
return Long.valueOf(rand.nextInt(1000000)); return Long.valueOf(rand.nextInt(1000000));
} else if (type.equals(float.class)) { } else if (type.equals(float.class)) {
return rand.nextFloat(); return rand.nextFloat();

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -105,6 +106,8 @@ public class RMActiveServiceContext {
private boolean isSchedulerReady = false; private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null; private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
public RMActiveServiceContext() { public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager(); queuePlacementManager = new PlacementManager();
} }
@ -467,4 +470,17 @@ public class RMActiveServiceContext {
public void setQueuePlacementManager(PlacementManager placementMgr) { public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr; this.queuePlacementManager = placementMgr;
} }
@Private
@Unstable
public void setRMAppLifetimeMonitor(
RMAppLifetimeMonitor lifetimeMonitor) {
this.rmAppLifetimeMonitor = lifetimeMonitor;
}
@Private
@Unstable
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
} }

View File

@ -385,6 +385,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
} }
} }
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
// Create RMApp // Create RMApp
RMAppImpl application = RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf, new RMAppImpl(applicationId, rmContext, this.conf,

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -149,4 +150,8 @@ public interface RMContext {
LeaderElectorService getLeaderElectorService(); LeaderElectorService getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator(); QueueLimitCalculator getNodeManagerQueueLimitCalculator();
void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -499,4 +500,15 @@ public class RMContextImpl implements RMContext {
QueueLimitCalculator limitCalculator) { QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator; this.queueLimitCalculator = limitCalculator;
} }
@Override
public void setRMAppLifetimeMonitor(
RMAppLifetimeMonitor rmAppLifetimeMonitor) {
this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
}
@Override
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions
.InvalidResourceBlacklistRequestException; .InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -470,4 +472,18 @@ public class RMServerUtils {
conf.set(entry.getKey(), entry.getValue()); conf.set(entry.getKey(), entry.getValue());
} }
} }
public static void validateApplicationTimeouts(
Map<ApplicationTimeoutType, Long> timeouts) throws YarnException {
if (timeouts != null) {
for (Map.Entry<ApplicationTimeoutType, Long> timeout : timeouts
.entrySet()) {
if (timeout.getValue() < 0) {
String message = "Invalid application timeout, value="
+ timeout.getValue() + " for type=" + timeout.getKey();
throw new YarnException(message);
}
}
}
}
} }

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -556,6 +557,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(amFinishingMonitor); addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager(); RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext); nlm.setRMContext(rmContext);
addService(nlm); addService(nlm);
@ -1398,4 +1403,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" " out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n"); + "[-remove-application-from-state-store <appId>]" + "\n");
} }
protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
return new RMAppLifetimeMonitor(this.rmContext);
}
} }

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -527,6 +528,8 @@ public class RMAppImpl implements RMApp, Recoverable {
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
} }
} }
} }
/** /**
@ -1106,6 +1109,20 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
} }
long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
if (applicationLifetime > 0) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
ApplicationTimeoutType.LIFETIME, app.submitTime,
applicationLifetime * 1000);
if (LOG.isDebugEnabled()) {
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type="
+ ApplicationTimeoutType.LIFETIME + " value="
+ applicationLifetime + " seconds");
}
}
// No existent attempts means the attempt associated with this app was not // No existent attempts means the attempt associated with this app was not
// started or started but not yet saved. // started or started but not yet saved.
if (app.attempts.isEmpty()) { if (app.attempts.isEmpty()) {
@ -1152,6 +1169,13 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
Map<ApplicationTimeoutType, Long> timeouts =
app.submissionContext.getApplicationTimeouts();
if (timeouts != null && timeouts.size() > 0) {
app.rmContext.getRMAppLifetimeMonitor()
.unregisterApp(app.getApplicationId(), timeouts.keySet());
}
if (app.transitionTodo instanceof SingleArcTransition) { if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app, ((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving); app.eventCausingFinalSaving);
@ -1160,7 +1184,6 @@ public class RMAppImpl implements RMApp, Recoverable {
app.eventCausingFinalSaving); app.eventCausingFinalSaving);
} }
return app.targetedFinalState; return app.targetedFinalState;
} }
} }
@ -1209,6 +1232,18 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
if (applicationLifetime > 0) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
ApplicationTimeoutType.LIFETIME, app.submitTime,
applicationLifetime * 1000);
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type="
+ ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
+ " seconds");
}
// If recovery is enabled then store the application information in a // If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information // non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client // needed to restart the AM after RM restart without further client
@ -1922,4 +1957,14 @@ public class RMAppImpl implements RMApp, Recoverable {
public int getNextAttemptId() { public int getNextAttemptId() {
return nextAttemptId; return nextAttemptId;
} }
private long getApplicationLifetime(ApplicationTimeoutType type) {
Map<ApplicationTimeoutType, Long> timeouts =
this.submissionContext.getApplicationTimeouts();
long applicationLifetime = -1;
if (timeouts != null && timeouts.containsKey(type)) {
applicationLifetime = timeouts.get(type);
}
return applicationLifetime;
}
} }

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* This service will monitor the applications against the lifetime value given.
* The applications will be killed if it running beyond the given time.
*/
public class RMAppLifetimeMonitor
extends AbstractLivelinessMonitor<RMAppToMonitor> {
private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
private RMContext rmContext;
private Map<RMAppToMonitor, Long> monitoredApps =
new HashMap<RMAppToMonitor, Long>();
private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
public RMAppLifetimeMonitor(RMContext rmContext) {
super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
long monitorInterval = conf.getLong(
YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
if (monitorInterval <= 0) {
monitorInterval =
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
}
setMonitorInterval(monitorInterval);
LOG.info("Application lifelime monitor interval set to " + monitorInterval
+ " ms.");
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
@Override
protected synchronized void expire(RMAppToMonitor monitoredAppKey) {
Long remove = monitoredApps.remove(monitoredAppKey);
ApplicationId appId = monitoredAppKey.getApplicationId();
RMApp app = rmContext.getRMApps().get(appId);
if (app == null) {
return;
}
// Don't trigger a KILL event if application is in completed states
if (!COMPLETED_APP_STATES.contains(app.getState())) {
String diagnostics =
"Application killed due to exceeding its lifetime period " + remove
+ " milliseconds";
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
} else {
LOG.info("Application " + appId
+ " is about to complete. So not killing the application.");
}
}
public synchronized void registerApp(ApplicationId appId,
ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) {
RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType);
register(appToMonitor, monitorStartTime);
monitoredApps.putIfAbsent(appToMonitor, timeout);
}
@Override
protected synchronized long getExpireInterval(
RMAppToMonitor monitoredAppKey) {
return monitoredApps.get(monitoredAppKey);
}
public synchronized void unregisterApp(ApplicationId appId,
ApplicationTimeoutType timeoutType) {
RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType);
unregister(appToRemove);
monitoredApps.remove(appToRemove);
}
public synchronized void unregisterApp(ApplicationId appId,
Set<ApplicationTimeoutType> types) {
for (ApplicationTimeoutType type : types) {
unregisterApp(appId, type);
}
}
public synchronized void updateApplicationTimeouts(ApplicationId appId,
Map<ApplicationTimeoutType, Long> timeouts) {
// TODO in YARN-5611
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
/**
* This class used for monitor application with applicationId+appTimeoutType.
*/
public class RMAppToMonitor {
private ApplicationId applicationId;
private ApplicationTimeoutType appTimeoutType;
RMAppToMonitor(ApplicationId appId, ApplicationTimeoutType timeoutType) {
this.applicationId = appId;
this.appTimeoutType = timeoutType;
}
public ApplicationId getApplicationId() {
return applicationId;
}
public ApplicationTimeoutType getAppTimeoutType() {
return appTimeoutType;
}
@Override
public int hashCode() {
return applicationId.hashCode() + appTimeoutType.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
RMAppToMonitor other = (RMAppToMonitor) obj;
if (!this.applicationId.equals(other.getApplicationId())) {
return false;
}
if (this.appTimeoutType != other.getAppTimeoutType()) {
return false;
}
return true;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(applicationId.toString()).append("_").append(appTimeoutType);
return sb.toString();
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor contains
* classes related to application monitor.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -460,7 +461,7 @@ public class MockRM extends ResourceManager {
return submitApp(resource, name, user, acls, false, queue, return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, null, true, priority, amLabel); false, null, 0, null, true, priority, amLabel, null);
} }
public RMApp submitApp(Resource resource, String name, String user, public RMApp submitApp(Resource resource, String name, String user,
@ -561,7 +562,7 @@ public class MockRM extends ResourceManager {
return submitApp(capability, name, user, acls, unmanaged, queue, return submitApp(capability, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval, isAppIdProvided, applicationId, attemptFailuresValidityInterval,
logAggregationContext, cancelTokensWhenComplete, priority, ""); logAggregationContext, cancelTokensWhenComplete, priority, "", null);
} }
public RMApp submitApp(Resource capability, String name, String user, public RMApp submitApp(Resource capability, String name, String user,
@ -570,7 +571,8 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval, ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext, LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel) boolean cancelTokensWhenComplete, Priority priority, String amLabel,
Map<ApplicationTimeoutType, Long> applicationTimeouts)
throws Exception { throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
@ -587,6 +589,9 @@ public class MockRM extends ResourceManager {
sub.setApplicationId(appId); sub.setApplicationId(appId);
sub.setApplicationName(name); sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts); sub.setMaxAppAttempts(maxAppAttempts);
if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
sub.setApplicationTimeouts(applicationTimeouts);
}
if (unmanaged) { if (unmanaged) {
sub.setUnmanagedAM(true); sub.setUnmanagedAM(true);
} }
@ -1073,4 +1078,15 @@ public class MockRM extends ResourceManager {
!apps.containsKey(appId)); !apps.containsKey(appId));
LOG.info("app is removed from scheduler, " + appId); LOG.info("app is removed from scheduler, " + appId);
} }
public RMApp submitApp(int masterMemory, Priority priority,
Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
return submitApp(
resource, "", UserGroupInformation.getCurrentUser().getShortUserName(),
null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
false, false, null, 0, null, true, priority, null, applicationTimeouts);
}
} }

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for application life time monitor feature test.
*/
public class TestApplicationLifetimeMonitor {
private YarnConfiguration conf;
@Before
public void setup() throws IOException {
conf = new YarnConfiguration();
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
UserGroupInformation.setConfiguration(conf);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
3000L);
}
@Test(timeout = 90000)
public void testApplicationLifetimeMonitor() throws Exception {
MockRM rm = null;
try {
rm = new MockRM(conf);
rm.start();
Priority appPriority = Priority.newInstance(0);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024);
Map<ApplicationTimeoutType, Long> timeouts =
new HashMap<ApplicationTimeoutType, Long>();
timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
nm1.nodeHeartbeat(true);
// Send launch Event
MockAM am1 =
rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am1.registerAppAttempt();
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertTrue("Applicaiton killed before lifetime value",
(System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
} finally {
stopRM(rm);
}
}
@SuppressWarnings("rawtypes")
@Test(timeout = 180000)
public void testApplicationLifetimeOnRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true);
long appLifetime = 60L;
Map<ApplicationTimeoutType, Long> timeouts =
new HashMap<ApplicationTimeoutType, Long>();
timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Re-start RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(amContainer, runningContainer), null);
// Wait for RM to settle down on recovering containers;
TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2,
am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
// check RMContainers are re-recreated and the container state is correct.
rm2.waitForState(nm1, amContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForState(nm1, runningContainer.getContainerId(),
RMContainerState.RUNNING);
// re register attempt to rm2
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.registerAppAttempt();
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING);
// wait for app life time and application to be in killed state.
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
Assert.assertTrue("Applicaiton killed before lifetime value",
(System.currentTimeMillis()
- recoveredApp1.getSubmitTime()) > appLifetime);
}
private void stopRM(MockRM rm) {
if (rm != null) {
rm.stop();
}
}
}