applicationTimeouts);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java
new file mode 100644
index 00000000000..edde1b0f4d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Application timeout type.
+ */
+@Public
+@Unstable
+public enum ApplicationTimeoutType {
+
+ /**
+ *
+ * Timeout imposed on overall application life time. It includes actual
+ * run-time plus non-runtime. Non-runtime delays are time elapsed by scheduler
+ * to allocate container, time taken to store in RMStateStore and etc.
+ *
+ * If this is set, then timeout monitoring start from application submission
+ * time.
+ */
+ LIFETIME;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1421873895d..4d433572683 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1533,6 +1533,12 @@ public class YarnConfiguration extends Configuration {
false;
+ // Configurations for applicaiton life time monitor feature
+ public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
+ RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms";
+
+ public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
+ 60000;
/**
* Interval of time the linux container executor should try cleaning up
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 2d6007efe83..f7882953fe4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -365,6 +365,16 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
+ repeated ApplicationTimeoutMapProto application_timeouts = 18;
+}
+
+enum ApplicationTimeoutTypeProto {
+ APP_TIMEOUT_LIFETIME = 1;
+}
+
+message ApplicationTimeoutMapProto {
+ optional ApplicationTimeoutTypeProto application_timeout_type = 1;
+ optional int64 timeout = 2;
}
message LogAggregationContextProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 67e3a84ce6c..62b54e7ed8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@@ -63,6 +69,7 @@ extends ApplicationSubmissionContext {
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
+ private Map applicationTimeouts = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -131,6 +138,9 @@ extends ApplicationSubmissionContext {
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
+ if (this.applicationTimeouts != null) {
+ addApplicationTimeouts();
+ }
}
private void mergeLocalToProto() {
@@ -548,4 +558,77 @@ extends ApplicationSubmissionContext {
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto();
}
+
+ @Override
+ public Map getApplicationTimeouts() {
+ initApplicationTimeout();
+ return this.applicationTimeouts;
+ }
+
+ private void initApplicationTimeout() {
+ if (this.applicationTimeouts != null) {
+ return;
+ }
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ List lists = p.getApplicationTimeoutsList();
+ this.applicationTimeouts =
+ new HashMap(lists.size());
+ for (ApplicationTimeoutMapProto timeoutProto : lists) {
+ this.applicationTimeouts.put(
+ ProtoUtils
+ .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
+ timeoutProto.getTimeout());
+ }
+ }
+
+ @Override
+ public void setApplicationTimeouts(
+ Map appTimeouts) {
+ if (appTimeouts == null) {
+ return;
+ }
+ initApplicationTimeout();
+ this.applicationTimeouts.clear();
+ this.applicationTimeouts.putAll(appTimeouts);
+ }
+
+ private void addApplicationTimeouts() {
+ maybeInitBuilder();
+ builder.clearApplicationTimeouts();
+ if (applicationTimeouts == null) {
+ return;
+ }
+ Iterable extends ApplicationTimeoutMapProto> values =
+ new Iterable() {
+
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ private Iterator iterator =
+ applicationTimeouts.keySet().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public ApplicationTimeoutMapProto next() {
+ ApplicationTimeoutType key = iterator.next();
+ return ApplicationTimeoutMapProto.newBuilder()
+ .setTimeout(applicationTimeouts.get(key))
+ .setApplicationTimeoutType(
+ ProtoUtils.convertToProtoFormat(key))
+ .build();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ this.builder.addAllApplicationTimeouts(values);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 128120e4719..ab283e7c831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
@@ -259,6 +261,23 @@ public class ProtoUtils {
return ApplicationAccessType.valueOf(e.name().replace(
APP_ACCESS_TYPE_PREFIX, ""));
}
+
+ /*
+ * ApplicationTimeoutType
+ */
+ private static String APP_TIMEOUT_TYPE_PREFIX = "APP_TIMEOUT_";
+
+ public static ApplicationTimeoutTypeProto convertToProtoFormat(
+ ApplicationTimeoutType e) {
+ return ApplicationTimeoutTypeProto
+ .valueOf(APP_TIMEOUT_TYPE_PREFIX + e.name());
+ }
+
+ public static ApplicationTimeoutType convertFromProtoFormat(
+ ApplicationTimeoutTypeProto e) {
+ return ApplicationTimeoutType
+ .valueOf(e.name().replace(APP_TIMEOUT_TYPE_PREFIX, ""));
+ }
/*
* Reservation Request interpreter type
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
index e80d032e1bc..b6050261519 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
@@ -44,8 +44,8 @@ public abstract class AbstractLivelinessMonitor extends AbstractService {
private Thread checkerThread;
private volatile boolean stopped;
public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
- private int expireInterval = DEFAULT_EXPIRE;
- private int monitorInterval = expireInterval/3;
+ private long expireInterval = DEFAULT_EXPIRE;
+ private long monitorInterval = expireInterval / 3;
private final Clock clock;
@@ -85,7 +85,12 @@ public abstract class AbstractLivelinessMonitor extends AbstractService {
this.expireInterval = expireInterval;
}
- protected void setMonitorInterval(int monitorInterval) {
+ protected long getExpireInterval(O o) {
+ // by-default return for all the registered object interval.
+ return this.expireInterval;
+ }
+
+ protected void setMonitorInterval(long monitorInterval) {
this.monitorInterval = monitorInterval;
}
@@ -97,7 +102,11 @@ public abstract class AbstractLivelinessMonitor extends AbstractService {
}
public synchronized void register(O ob) {
- running.put(ob, clock.getTime());
+ register(ob, clock.getTime());
+ }
+
+ public synchronized void register(O ob, long monitorStartTime) {
+ running.put(ob, monitorStartTime);
}
public synchronized void unregister(O ob) {
@@ -117,19 +126,20 @@ public abstract class AbstractLivelinessMonitor extends AbstractService {
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) {
- Iterator> iterator =
- running.entrySet().iterator();
+ Iterator> iterator = running.entrySet().iterator();
- //avoid calculating current time everytime in loop
+ // avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
Map.Entry entry = iterator.next();
- if (currentTime > entry.getValue() + expireInterval) {
+ O key = entry.getKey();
+ long interval = getExpireInterval(key);
+ if (currentTime > entry.getValue() + interval) {
iterator.remove();
- expire(entry.getKey());
- LOG.info("Expired:" + entry.getKey().toString() +
- " Timed out after " + expireInterval/1000 + " secs");
+ expire(key);
+ LOG.info("Expired:" + entry.getKey().toString()
+ + " Timed out after " + interval / 1000 + " secs");
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 965b57547e3..524afecd852 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3075,4 +3075,13 @@
yarn.resourcemanager.node-removal-untracked.timeout-ms
60000
+
+
+
+ The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval
+
+ yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms
+ 60000
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index e57a5a205b3..527048650c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -368,7 +368,7 @@ public class TestPBImplRecords {
return bytes[rand.nextInt(4)];
} else if (type.equals(int.class) || type.equals(Integer.class)) {
return rand.nextInt(1000000);
- } else if (type.equals(long.class)) {
+ } else if (type.equals(long.class) || type.equals(Long.class)) {
return Long.valueOf(rand.nextInt(1000000));
} else if (type.equals(float.class)) {
return rand.nextFloat();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index caa0ff13721..0e305a9417d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -105,6 +106,8 @@ public class RMActiveServiceContext {
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
+ private RMAppLifetimeMonitor rmAppLifetimeMonitor;
+
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@@ -467,4 +470,17 @@ public class RMActiveServiceContext {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
+
+ @Private
+ @Unstable
+ public void setRMAppLifetimeMonitor(
+ RMAppLifetimeMonitor lifetimeMonitor) {
+ this.rmAppLifetimeMonitor = lifetimeMonitor;
+ }
+
+ @Private
+ @Unstable
+ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
+ return this.rmAppLifetimeMonitor;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 136dee04373..7352a284c48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -385,6 +385,10 @@ public class RMAppManager implements EventHandler,
}
}
+ // fail the submission if configured application timeout value is invalid
+ RMServerUtils.validateApplicationTimeouts(
+ submissionContext.getApplicationTimeouts());
+
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 2ba445cb6a2..c9d185f9054 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -149,4 +150,8 @@ public interface RMContext {
LeaderElectorService getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
+
+ void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
+
+ RMAppLifetimeMonitor getRMAppLifetimeMonitor();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 1e702de9342..dc8f7d1ea0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -499,4 +500,15 @@ public class RMContextImpl implements RMContext {
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
+
+ @Override
+ public void setRMAppLifetimeMonitor(
+ RMAppLifetimeMonitor rmAppLifetimeMonitor) {
+ this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
+ }
+
+ @Override
+ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
+ return this.activeServiceContext.getRMAppLifetimeMonitor();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 7fcabab7d46..b90e499601d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions
.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -470,4 +472,18 @@ public class RMServerUtils {
conf.set(entry.getKey(), entry.getValue());
}
}
+
+ public static void validateApplicationTimeouts(
+ Map timeouts) throws YarnException {
+ if (timeouts != null) {
+ for (Map.Entry timeout : timeouts
+ .entrySet()) {
+ if (timeout.getValue() < 0) {
+ String message = "Invalid application timeout, value="
+ + timeout.getValue() + " for type=" + timeout.getKey();
+ throw new YarnException(message);
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8a6997d8c67..5e9bece33c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@@ -556,6 +557,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
+ RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
+ addService(rmAppLifetimeMonitor);
+ rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
+
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
@@ -1398,4 +1403,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store ]" + "\n");
}
+
+ protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
+ return new RMAppLifetimeMonitor(this.rmContext);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index e5bde32b282..727703b690e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -527,6 +528,8 @@ public class RMAppImpl implements RMApp, Recoverable {
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
}
}
+
+
}
/**
@@ -1106,6 +1109,20 @@ public class RMAppImpl implements RMApp, Recoverable {
}
}
+ long applicationLifetime =
+ app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
+ if (applicationLifetime > 0) {
+ app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
+ ApplicationTimeoutType.LIFETIME, app.submitTime,
+ applicationLifetime * 1000);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application " + app.applicationId
+ + " is registered for timeout monitor, type="
+ + ApplicationTimeoutType.LIFETIME + " value="
+ + applicationLifetime + " seconds");
+ }
+ }
+
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@@ -1152,6 +1169,13 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ Map timeouts =
+ app.submissionContext.getApplicationTimeouts();
+ if (timeouts != null && timeouts.size() > 0) {
+ app.rmContext.getRMAppLifetimeMonitor()
+ .unregisterApp(app.getApplicationId(), timeouts.keySet());
+ }
+
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
@@ -1160,7 +1184,6 @@ public class RMAppImpl implements RMApp, Recoverable {
app.eventCausingFinalSaving);
}
return app.targetedFinalState;
-
}
}
@@ -1209,6 +1232,18 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
+ long applicationLifetime =
+ app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
+ if (applicationLifetime > 0) {
+ app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
+ ApplicationTimeoutType.LIFETIME, app.submitTime,
+ applicationLifetime * 1000);
+ LOG.debug("Application " + app.applicationId
+ + " is registered for timeout monitor, type="
+ + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
+ + " seconds");
+ }
+
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
@@ -1922,4 +1957,14 @@ public class RMAppImpl implements RMApp, Recoverable {
public int getNextAttemptId() {
return nextAttemptId;
}
+
+ private long getApplicationLifetime(ApplicationTimeoutType type) {
+ Map timeouts =
+ this.submissionContext.getApplicationTimeouts();
+ long applicationLifetime = -1;
+ if (timeouts != null && timeouts.containsKey(type)) {
+ applicationLifetime = timeouts.get(type);
+ }
+ return applicationLifetime;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
new file mode 100644
index 00000000000..e550c972ae4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This service will monitor the applications against the lifetime value given.
+ * The applications will be killed if it running beyond the given time.
+ */
+public class RMAppLifetimeMonitor
+ extends AbstractLivelinessMonitor {
+
+ private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
+
+ private RMContext rmContext;
+ private Map monitoredApps =
+ new HashMap();
+
+ private static final EnumSet COMPLETED_APP_STATES =
+ EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
+
+ public RMAppLifetimeMonitor(RMContext rmContext) {
+ super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ long monitorInterval = conf.getLong(
+ YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
+ if (monitorInterval <= 0) {
+ monitorInterval =
+ YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
+ }
+ setMonitorInterval(monitorInterval);
+ LOG.info("Application lifelime monitor interval set to " + monitorInterval
+ + " ms.");
+ super.serviceInit(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected synchronized void expire(RMAppToMonitor monitoredAppKey) {
+ Long remove = monitoredApps.remove(monitoredAppKey);
+ ApplicationId appId = monitoredAppKey.getApplicationId();
+ RMApp app = rmContext.getRMApps().get(appId);
+ if (app == null) {
+ return;
+ }
+ // Don't trigger a KILL event if application is in completed states
+ if (!COMPLETED_APP_STATES.contains(app.getState())) {
+ String diagnostics =
+ "Application killed due to exceeding its lifetime period " + remove
+ + " milliseconds";
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
+ } else {
+ LOG.info("Application " + appId
+ + " is about to complete. So not killing the application.");
+ }
+ }
+
+ public synchronized void registerApp(ApplicationId appId,
+ ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) {
+ RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType);
+ register(appToMonitor, monitorStartTime);
+ monitoredApps.putIfAbsent(appToMonitor, timeout);
+ }
+
+ @Override
+ protected synchronized long getExpireInterval(
+ RMAppToMonitor monitoredAppKey) {
+ return monitoredApps.get(monitoredAppKey);
+ }
+
+ public synchronized void unregisterApp(ApplicationId appId,
+ ApplicationTimeoutType timeoutType) {
+ RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType);
+ unregister(appToRemove);
+ monitoredApps.remove(appToRemove);
+ }
+
+ public synchronized void unregisterApp(ApplicationId appId,
+ Set types) {
+ for (ApplicationTimeoutType type : types) {
+ unregisterApp(appId, type);
+ }
+ }
+
+ public synchronized void updateApplicationTimeouts(ApplicationId appId,
+ Map timeouts) {
+ // TODO in YARN-5611
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java
new file mode 100644
index 00000000000..1cf2132659f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+
+/**
+ * This class used for monitor application with applicationId+appTimeoutType.
+ */
+public class RMAppToMonitor {
+
+ private ApplicationId applicationId;
+ private ApplicationTimeoutType appTimeoutType;
+
+ RMAppToMonitor(ApplicationId appId, ApplicationTimeoutType timeoutType) {
+ this.applicationId = appId;
+ this.appTimeoutType = timeoutType;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public ApplicationTimeoutType getAppTimeoutType() {
+ return appTimeoutType;
+ }
+
+ @Override
+ public int hashCode() {
+ return applicationId.hashCode() + appTimeoutType.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ RMAppToMonitor other = (RMAppToMonitor) obj;
+ if (!this.applicationId.equals(other.getApplicationId())) {
+ return false;
+ }
+ if (this.appTimeoutType != other.getAppTimeoutType()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(applicationId.toString()).append("_").append(appTimeoutType);
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java
new file mode 100644
index 00000000000..a3cc7ef2cdc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor contains
+ * classes related to application monitor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index f84326139e7..25a82885162 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -460,7 +461,7 @@ public class MockRM extends ResourceManager {
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, 0, null, true, priority, amLabel);
+ false, null, 0, null, true, priority, amLabel, null);
}
public RMApp submitApp(Resource resource, String name, String user,
@@ -561,7 +562,7 @@ public class MockRM extends ResourceManager {
return submitApp(capability, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
- logAggregationContext, cancelTokensWhenComplete, priority, "");
+ logAggregationContext, cancelTokensWhenComplete, priority, "", null);
}
public RMApp submitApp(Resource capability, String name, String user,
@@ -570,7 +571,8 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
- boolean cancelTokensWhenComplete, Priority priority, String amLabel)
+ boolean cancelTokensWhenComplete, Priority priority, String amLabel,
+ Map applicationTimeouts)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@@ -587,6 +589,9 @@ public class MockRM extends ResourceManager {
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
+ if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
+ sub.setApplicationTimeouts(applicationTimeouts);
+ }
if (unmanaged) {
sub.setUnmanagedAM(true);
}
@@ -1073,4 +1078,15 @@ public class MockRM extends ResourceManager {
!apps.containsKey(appId));
LOG.info("app is removed from scheduler, " + appId);
}
+
+ public RMApp submitApp(int masterMemory, Priority priority,
+ Map applicationTimeouts) throws Exception {
+ Resource resource = Resource.newInstance(masterMemory, 0);
+ return submitApp(
+ resource, "", UserGroupInformation.getCurrentUser().getShortUserName(),
+ null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+ false, false, null, 0, null, true, priority, null, applicationTimeouts);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
new file mode 100644
index 00000000000..3f2db1dd779
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for application life time monitor feature test.
+ */
+public class TestApplicationLifetimeMonitor {
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new YarnConfiguration();
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ UserGroupInformation.setConfiguration(conf);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
+ 3000L);
+ }
+
+ @Test(timeout = 90000)
+ public void testApplicationLifetimeMonitor() throws Exception {
+ MockRM rm = null;
+ try {
+ rm = new MockRM(conf);
+ rm.start();
+ Priority appPriority = Priority.newInstance(0);
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024);
+
+ Map timeouts =
+ new HashMap();
+ timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
+ RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
+ nm1.nodeHeartbeat(true);
+ // Send launch Event
+ MockAM am1 =
+ rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
+ am1.registerAppAttempt();
+ rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue("Applicaiton killed before lifetime value",
+ (System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
+ } finally {
+ stopRM(rm);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 180000)
+ public void testApplicationLifetimeOnRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+
+ long appLifetime = 60L;
+ Map timeouts =
+ new HashMap();
+ timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
+ RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Re-start RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // recover app
+ RMApp recoveredApp1 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+
+ NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+ nm1.registerNode(Arrays.asList(amContainer, runningContainer), null);
+
+ // Wait for RM to settle down on recovering containers;
+ TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2,
+ am1.getApplicationAttemptId());
+ Set launchedContainers =
+ ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+ .getLaunchedContainers();
+ assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+ assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
+
+ // check RMContainers are re-recreated and the container state is correct.
+ rm2.waitForState(nm1, amContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForState(nm1, runningContainer.getContainerId(),
+ RMContainerState.RUNNING);
+
+ // re register attempt to rm2
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am1.registerAppAttempt();
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING);
+
+ // wait for app life time and application to be in killed state.
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue("Applicaiton killed before lifetime value",
+ (System.currentTimeMillis()
+ - recoveredApp1.getSubmitTime()) > appLifetime);
+ }
+
+ private void stopRM(MockRM rm) {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+}