YARN-6600. Introduce default and max lifetime of application at LeafQueue level. Contributed by Rohith Sharma K S.

This commit is contained in:
Sunil G 2017-09-08 19:20:52 +05:30
parent 796404c3ed
commit a39829c456
17 changed files with 503 additions and 23 deletions

View File

@ -578,7 +578,7 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
* <b>Note:</b> If application timeout value is less than or equal to current * <b>Note:</b> If application timeout value is less than or equal to current
* time then update application throws YarnException. * time then update application throws YarnException.
* @param request to set ApplicationTimeouts of an application * @param request to set ApplicationTimeouts of an application
* @return an empty response that the update has completed successfully. * @return a response with updated timeouts.
* @throws YarnException if update request has empty values or application is * @throws YarnException if update request has empty values or application is
* in completing states. * in completing states.
* @throws IOException on IO failures * @throws IOException on IO failures

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -43,4 +44,22 @@ public abstract class UpdateApplicationTimeoutsResponse {
Records.newRecord(UpdateApplicationTimeoutsResponse.class); Records.newRecord(UpdateApplicationTimeoutsResponse.class);
return response; return response;
} }
/**
* Get <code>ApplicationTimeouts</code> of the application. Timeout value is
* in ISO8601 standard with format <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.
* @return all <code>ApplicationTimeouts</code> of the application.
*/
public abstract Map<ApplicationTimeoutType, String> getApplicationTimeouts();
/**
* Set the <code>ApplicationTimeouts</code> for the application. Timeout value
* is absolute. Timeout value should meet ISO8601 format. Support ISO8601
* format is <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>. All pre-existing Map entries
* are cleared before adding the new Map.
* @param applicationTimeouts <code>ApplicationTimeouts</code>s for the
* application
*/
public abstract void setApplicationTimeouts(
Map<ApplicationTimeoutType, String> applicationTimeouts);
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -359,10 +360,21 @@ public class ApplicationCLI extends YarnCLI {
+ timeoutType.toString() + " of an application " + applicationId); + timeoutType.toString() + " of an application " + applicationId);
UpdateApplicationTimeoutsRequest request = UpdateApplicationTimeoutsRequest UpdateApplicationTimeoutsRequest request = UpdateApplicationTimeoutsRequest
.newInstance(appId, Collections.singletonMap(timeoutType, newTimeout)); .newInstance(appId, Collections.singletonMap(timeoutType, newTimeout));
client.updateApplicationTimeouts(request); UpdateApplicationTimeoutsResponse updateApplicationTimeouts =
client.updateApplicationTimeouts(request);
String updatedTimeout =
updateApplicationTimeouts.getApplicationTimeouts().get(timeoutType);
if (timeoutType.equals(ApplicationTimeoutType.LIFETIME)
&& !newTimeout.equals(updatedTimeout)) {
sysout.println("Updated lifetime of an application " + applicationId
+ " to queue max/default lifetime." + " New expiry time is "
+ updatedTimeout);
return;
}
sysout.println( sysout.println(
"Successfully updated " + timeoutType.toString() + " of an application " "Successfully updated " + timeoutType.toString() + " of an application "
+ applicationId + ". New expiry time is " + newTimeout); + applicationId + ". New expiry time is " + updatedTimeout);
} }
/** /**

View File

@ -48,6 +48,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils; import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -2148,17 +2149,16 @@ public class TestYarnCLI {
ApplicationCLI cli = createAndGetAppCLI(); ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 6); ApplicationId applicationId = ApplicationId.newInstance(1234, 6);
ApplicationReport appReport = ApplicationReport.newInstance(applicationId, UpdateApplicationTimeoutsResponse response =
ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", mock(UpdateApplicationTimeoutsResponse.class);
"appname", "host", 124, null, YarnApplicationState.RUNNING, String formatISO8601 =
"diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, Times.formatISO8601(System.currentTimeMillis() + 5 * 1000);
"N/A", 0.53789f, "YARN", null); when(response.getApplicationTimeouts()).thenReturn(Collections
ApplicationTimeout timeout = ApplicationTimeout .singletonMap(ApplicationTimeoutType.LIFETIME, formatISO8601));
.newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1);
appReport.setApplicationTimeouts( when(client
Collections.singletonMap(timeout.getTimeoutType(), timeout)); .updateApplicationTimeouts(any(UpdateApplicationTimeoutsRequest.class)))
when(client.getApplicationReport(any(ApplicationId.class))) .thenReturn(response);
.thenReturn(appReport);
int result = cli.run(new String[] { "application", "-appId", int result = cli.run(new String[] { "application", "-appId",
applicationId.toString(), "-updateLifetime", "10" }); applicationId.toString(), "-updateLifetime", "10" });

View File

@ -18,10 +18,19 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProtoOrBuilder;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
@ -33,6 +42,7 @@ public class UpdateApplicationTimeoutsResponsePBImpl
UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); UpdateApplicationTimeoutsResponseProto.getDefaultInstance();
UpdateApplicationTimeoutsResponseProto.Builder builder = null; UpdateApplicationTimeoutsResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Map<ApplicationTimeoutType, String> applicationTimeouts = null;
public UpdateApplicationTimeoutsResponsePBImpl() { public UpdateApplicationTimeoutsResponsePBImpl() {
builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); builder = UpdateApplicationTimeoutsResponseProto.newBuilder();
@ -45,11 +55,34 @@ public class UpdateApplicationTimeoutsResponsePBImpl
} }
public UpdateApplicationTimeoutsResponseProto getProto() { public UpdateApplicationTimeoutsResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
} }
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UpdateApplicationTimeoutsResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.applicationTimeouts != null) {
addApplicationTimeouts();
}
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();
@ -70,4 +103,79 @@ public class UpdateApplicationTimeoutsResponsePBImpl
public String toString() { public String toString() {
return TextFormat.shortDebugString(getProto()); return TextFormat.shortDebugString(getProto());
} }
@Override
public Map<ApplicationTimeoutType, String> getApplicationTimeouts() {
initApplicationTimeout();
return this.applicationTimeouts;
}
private void initApplicationTimeout() {
if (this.applicationTimeouts != null) {
return;
}
UpdateApplicationTimeoutsResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<ApplicationUpdateTimeoutMapProto> lists =
p.getApplicationTimeoutsList();
this.applicationTimeouts =
new HashMap<ApplicationTimeoutType, String>(lists.size());
for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) {
this.applicationTimeouts.put(
ProtoUtils
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
timeoutProto.getExpireTime());
}
}
@Override
public void setApplicationTimeouts(
Map<ApplicationTimeoutType, String> appTimeouts) {
if (appTimeouts == null) {
return;
}
initApplicationTimeout();
this.applicationTimeouts.clear();
this.applicationTimeouts.putAll(appTimeouts);
}
private void addApplicationTimeouts() {
maybeInitBuilder();
builder.clearApplicationTimeouts();
if (applicationTimeouts == null) {
return;
}
Iterable<? extends ApplicationUpdateTimeoutMapProto> values =
new Iterable<ApplicationUpdateTimeoutMapProto>() {
@Override
public Iterator<ApplicationUpdateTimeoutMapProto> iterator() {
return new Iterator<ApplicationUpdateTimeoutMapProto>() {
private Iterator<ApplicationTimeoutType> iterator =
applicationTimeouts.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public ApplicationUpdateTimeoutMapProto next() {
ApplicationTimeoutType key = iterator.next();
return ApplicationUpdateTimeoutMapProto.newBuilder()
.setExpireTime(applicationTimeouts.get(key))
.setApplicationTimeoutType(
ProtoUtils.convertToProtoFormat(key))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllApplicationTimeouts(values);
}
} }

View File

@ -106,6 +106,41 @@
</description> </description>
</property> </property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-application-lifetime
</name>
<value>-1</value>
<description>
Maximum lifetime of an application which is submitted to a queue
in seconds. Any value less than or equal to zero will be considered as
disabled.
This will be a hard time limit for all applications in this
queue. If positive value is configured then any application submitted
to this queue will be killed after exceeds the configured lifetime.
User can also specify lifetime per application basis in
application submission context. But user lifetime will be
overridden if it exceeds queue maximum lifetime. It is point-in-time
configuration.
Note : Configuring too low value will result in killing application
sooner. This feature is applicable only for leaf queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.default-application-lifetime
</name>
<value>-1</value>
<description>
Default lifetime of an application which is submitted to a queue
in seconds. Any value less than or equal to zero will be considered as
disabled.
If the user has not submitted application with lifetime value then this
value will be taken. It is point-in-time configuration.
Note : Default lifetime can't exceed maximum lifetime. This feature is
applicable only for leaf queue.
</description>
</property>
<property> <property>
<name>yarn.scheduler.capacity.node-locality-delay</name> <name>yarn.scheduler.capacity.node-locality-delay</name>
<value>40</value> <value>40</value>

View File

@ -1654,6 +1654,7 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService",
applicationId); applicationId);
response.setApplicationTimeouts(applicationTimeouts);
return response; return response;
} }
String msg = String msg =
@ -1665,7 +1666,8 @@ public class ClientRMService extends AbstractService implements
} }
try { try {
rmAppManager.updateApplicationTimeout(application, applicationTimeouts); applicationTimeouts = rmAppManager.updateApplicationTimeout(application,
applicationTimeouts);
} catch (YarnException ex) { } catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(), RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
@ -1675,6 +1677,7 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId);
response.setApplicationTimeouts(applicationTimeouts);
return response; return response;
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -551,18 +552,41 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
} }
// transaction method. // transaction method.
public void updateApplicationTimeout(RMApp app, public Map<ApplicationTimeoutType, String> updateApplicationTimeout(RMApp app,
Map<ApplicationTimeoutType, String> newTimeoutInISO8601Format) Map<ApplicationTimeoutType, String> newTimeoutInISO8601Format)
throws YarnException { throws YarnException {
ApplicationId applicationId = app.getApplicationId(); ApplicationId applicationId = app.getApplicationId();
synchronized (applicationId) { synchronized (applicationId) {
if (app.isAppInCompletedStates()) { if (app.isAppInCompletedStates()) {
return; return newTimeoutInISO8601Format;
} }
Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils
.validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format);
// validation is only for lifetime
Long updatedlifetimeInMillis =
newExpireTime.get(ApplicationTimeoutType.LIFETIME);
if (updatedlifetimeInMillis != null) {
long queueMaxLifetimeInSec =
scheduler.getMaximumApplicationLifetime(app.getQueue());
if (queueMaxLifetimeInSec > 0) {
if (updatedlifetimeInMillis > (app.getSubmitTime()
+ queueMaxLifetimeInSec * 1000)) {
updatedlifetimeInMillis =
app.getSubmitTime() + queueMaxLifetimeInSec * 1000;
// cut off to maximum queue lifetime if update lifetime is exceeding
// queue lifetime.
newExpireTime.put(ApplicationTimeoutType.LIFETIME,
updatedlifetimeInMillis);
newTimeoutInISO8601Format.put(ApplicationTimeoutType.LIFETIME,
Times.formatISO8601(updatedlifetimeInMillis.longValue()));
}
}
}
SettableFuture<Object> future = SettableFuture.create(); SettableFuture<Object> future = SettableFuture.create();
Map<ApplicationTimeoutType, Long> currentExpireTimeouts = Map<ApplicationTimeoutType, Long> currentExpireTimeouts =
@ -585,6 +609,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// update in-memory // update in-memory
((RMAppImpl) app).updateApplicationTimeout(newExpireTime); ((RMAppImpl) app).updateApplicationTimeout(newExpireTime);
return newTimeoutInISO8601Format;
} }
} }

View File

@ -1167,6 +1167,8 @@ public class RMAppImpl implements RMApp, Recoverable {
long applicationLifetime = long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
applicationLifetime = app.scheduler
.checkAndGetApplicationLifetime(app.queue, applicationLifetime);
if (applicationLifetime > 0) { if (applicationLifetime > 0) {
// calculate next timeout value // calculate next timeout value
Long newTimeout = Long newTimeout =

View File

@ -1283,4 +1283,15 @@ public abstract class AbstractYarnScheduler
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new ReleaseContainerEvent(container)); .handle(new ReleaseContainerEvent(container));
} }
@Override
public long checkAndGetApplicationLifetime(String queueName, long lifetime) {
// -1 indicates, lifetime is not configured.
return -1;
}
@Override
public long getMaximumApplicationLifetime(String queueName) {
return -1;
}
} }

View File

@ -386,4 +386,24 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @return the normalized resource * @return the normalized resource
*/ */
Resource getNormalizedResource(Resource requestedResource); Resource getNormalizedResource(Resource requestedResource);
/**
* Verify whether a submitted application lifetime is valid as per configured
* Queue lifetime.
* @param queueName Name of the Queue
* @param lifetime configured application lifetime
* @return valid lifetime as per queue
*/
@Public
@Evolving
long checkAndGetApplicationLifetime(String queueName, long lifetime);
/**
* Get maximum lifetime for a queue.
* @param queueName to get lifetime
* @return maximum lifetime in seconds
*/
@Public
@Evolving
long getMaximumApplicationLifetime(String queueName);
} }

View File

@ -2565,4 +2565,47 @@ public class CapacityScheduler extends
writeLock.unlock(); writeLock.unlock();
} }
} }
@Override
public long checkAndGetApplicationLifetime(String queueName,
long lifetimeRequestedByApp) {
try {
readLock.lock();
CSQueue queue = getQueue(queueName);
if (queue == null || !(queue instanceof LeafQueue)) {
return lifetimeRequestedByApp;
}
long defaultApplicationLifetime =
((LeafQueue) queue).getDefaultApplicationLifetime();
long maximumApplicationLifetime =
((LeafQueue) queue).getMaximumApplicationLifetime();
// check only for maximum, that's enough because default cann't
// exceed maximum
if (maximumApplicationLifetime <= 0) {
return lifetimeRequestedByApp;
}
if (lifetimeRequestedByApp <= 0) {
return defaultApplicationLifetime;
} else if (lifetimeRequestedByApp > maximumApplicationLifetime) {
return maximumApplicationLifetime;
}
return lifetimeRequestedByApp;
} finally {
readLock.unlock();
}
}
@Override
public long getMaximumApplicationLifetime(String queueName) {
CSQueue queue = getQueue(queueName);
if (queue == null || !(queue instanceof LeafQueue)) {
LOG.error("Unknown queue: " + queueName);
return -1;
}
// In seconds
return ((LeafQueue) queue).getMaximumApplicationLifetime();
}
} }

View File

@ -1496,4 +1496,30 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public int getMaxAssignPerHeartbeat() { public int getMaxAssignPerHeartbeat() {
return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT); return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
} }
public static final String MAXIMUM_LIFETIME_SUFFIX =
"maximum-application-lifetime";
public static final String DEFAULT_LIFETIME_SUFFIX =
"default-application-lifetime";
public long getMaximumLifetimePerQueue(String queue) {
long maximumLifetimePerQueue = getLong(
getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, (long) UNDEFINED);
return maximumLifetimePerQueue;
}
public void setMaximumLifetimePerQueue(String queue, long maximumLifetime) {
setLong(getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, maximumLifetime);
}
public long getDefaultLifetimePerQueue(String queue) {
long maximumLifetimePerQueue = getLong(
getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, (long) UNDEFINED);
return maximumLifetimePerQueue;
}
public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
}
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -143,6 +144,10 @@ public class LeafQueue extends AbstractCSQueue {
List<AppPriorityACLGroup> priorityAcls = List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>(); new ArrayList<AppPriorityACLGroup>();
// -1 indicates lifetime is disabled
private volatile long maxApplicationLifetime = -1;
private volatile long defaultApplicationLifetime = -1;
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs, public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException { String queueName, CSQueue parent, CSQueue old) throws IOException {
@ -251,6 +256,18 @@ public class LeafQueue extends AbstractCSQueue {
defaultAppPriorityPerQueue = Priority.newInstance( defaultAppPriorityPerQueue = Priority.newInstance(
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
maxApplicationLifetime =
conf.getMaximumLifetimePerQueue((getQueuePath()));
defaultApplicationLifetime =
conf.getDefaultLifetimePerQueue((getQueuePath()));
if (defaultApplicationLifetime > maxApplicationLifetime) {
throw new YarnRuntimeException(
"Default lifetime" + defaultApplicationLifetime
+ " can't exceed maximum lifetime " + maxApplicationLifetime);
}
defaultApplicationLifetime = defaultApplicationLifetime > 0
? defaultApplicationLifetime : maxApplicationLifetime;
// Validate leaf queue's user's weights. // Validate leaf queue's user's weights.
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) { for (Entry<String, Float> e : getUserWeights().entrySet()) {
@ -306,7 +323,10 @@ public class LeafQueue extends AbstractCSQueue {
+ "reservationsContinueLooking = " + "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = " + reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue + "\npriority = " + priority); + defaultAppPriorityPerQueue + "\npriority = " + priority
+ "\nmaxLifetime = " + maxApplicationLifetime + " seconds"
+ "\ndefaultLifetime = "
+ defaultApplicationLifetime + " seconds");
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -2099,4 +2119,12 @@ public class LeafQueue extends AbstractCSQueue {
this.userLimit = userLimit; this.userLimit = userLimit;
} }
} }
public long getMaximumApplicationLifetime() {
return maxApplicationLifetime;
}
public long getDefaultApplicationLifetime() {
return defaultApplicationLifetime;
}
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -50,6 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -67,6 +71,9 @@ public class TestApplicationLifetimeMonitor {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
// Always run for CS, since other scheduler do not support this.
conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
@ -78,8 +85,15 @@ public class TestApplicationLifetimeMonitor {
public void testApplicationLifetimeMonitor() throws Exception { public void testApplicationLifetimeMonitor() throws Exception {
MockRM rm = null; MockRM rm = null;
try { try {
long maxLifetime = 30L;
long defaultLifetime = 15L;
YarnConfiguration newConf =
new YarnConfiguration(setUpCSQueue(maxLifetime, defaultLifetime));
conf = new YarnConfiguration(newConf);
rm = new MockRM(conf); rm = new MockRM(conf);
rm.start(); rm.start();
Priority appPriority = Priority.newInstance(0); Priority appPriority = Priority.newInstance(0);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024);
@ -92,6 +106,14 @@ public class TestApplicationLifetimeMonitor {
timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); timeouts.put(ApplicationTimeoutType.LIFETIME, 20L);
RMApp app2 = rm.submitApp(1024, appPriority, timeouts); RMApp app2 = rm.submitApp(1024, appPriority, timeouts);
// user not set lifetime, so queue max lifetime will be considered.
RMApp app3 = rm.submitApp(1024, appPriority,
new HashMap<ApplicationTimeoutType, Long>());
// asc lifetime exceeds queue max lifetime
timeouts.put(ApplicationTimeoutType.LIFETIME, 40L);
RMApp app4 = rm.submitApp(1024, appPriority, timeouts);
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
// Send launch Event // Send launch Event
MockAM am1 = MockAM am1 =
@ -103,8 +125,9 @@ public class TestApplicationLifetimeMonitor {
Map<ApplicationTimeoutType, String> updateTimeout = Map<ApplicationTimeoutType, String> updateTimeout =
new HashMap<ApplicationTimeoutType, String>(); new HashMap<ApplicationTimeoutType, String>();
long newLifetime = 10L; long newLifetime = 40L;
// update 10L seconds more to timeout // update 30L seconds more to timeout which is greater than queue max
// lifetime
String formatISO8601 = String formatISO8601 =
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000); Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000);
updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601); updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601);
@ -142,8 +165,6 @@ public class TestApplicationLifetimeMonitor {
!appTimeouts.isEmpty()); !appTimeouts.isEmpty());
ApplicationTimeout timeout = ApplicationTimeout timeout =
appTimeouts.get(ApplicationTimeoutType.LIFETIME); appTimeouts.get(ApplicationTimeoutType.LIFETIME);
Assert.assertEquals("Application timeout string is incorrect.",
formatISO8601, timeout.getExpiryTime());
Assert.assertTrue("Application remaining time is incorrect", Assert.assertTrue("Application remaining time is incorrect",
timeout.getRemainingTime() > 0); timeout.getRemainingTime() > 0);
@ -152,6 +173,17 @@ public class TestApplicationLifetimeMonitor {
Assert.assertTrue("Application killed before lifetime value", Assert.assertTrue("Application killed before lifetime value",
app2.getFinishTime() > afterUpdate); app2.getFinishTime() > afterUpdate);
rm.waitForState(app3.getApplicationId(), RMAppState.KILLED);
// app4 submitted exceeding queue max lifetime, so killed after queue max
// lifetime.
rm.waitForState(app4.getApplicationId(), RMAppState.KILLED);
long totalTimeRun = (app4.getFinishTime() - app4.getSubmitTime()) / 1000;
Assert.assertTrue("Application killed before lifetime value",
totalTimeRun > maxLifetime);
Assert.assertTrue(
"Application killed before lifetime value " + totalTimeRun,
totalTimeRun < maxLifetime + 10L);
} finally { } finally {
stopRM(rm); stopRM(rm);
} }
@ -172,7 +204,7 @@ public class TestApplicationLifetimeMonitor {
nm1.registerNode(); nm1.registerNode();
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
long appLifetime = 60L; long appLifetime = 30L;
Map<ApplicationTimeoutType, Long> timeouts = Map<ApplicationTimeoutType, Long> timeouts =
new HashMap<ApplicationTimeoutType, Long>(); new HashMap<ApplicationTimeoutType, Long>();
timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime); timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
@ -305,6 +337,21 @@ public class TestApplicationLifetimeMonitor {
} }
} }
private CapacitySchedulerConfiguration setUpCSQueue(long maxLifetime,
long defaultLifetime) {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"default"});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".default", 100);
csConf.setMaximumLifetimePerQueue(
CapacitySchedulerConfiguration.ROOT + ".default", maxLifetime);
csConf.setDefaultLifetimePerQueue(
CapacitySchedulerConfiguration.ROOT + ".default", defaultLifetime);
return csConf;
}
private void stopRM(MockRM rm) { private void stopRM(MockRM rm) {
if (rm != null) { if (rm != null) {
rm.stop(); rm.stop();

View File

@ -4909,6 +4909,96 @@ public class TestCapacityScheduler {
rm.stop(); rm.stop();
} }
@Test(timeout = 30000)
public void testcheckAndGetApplicationLifetime() throws Exception {
long maxLifetime = 10;
long defaultLifetime = 5;
// positive integer value
CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100));
Assert.assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9));
Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1));
Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0));
Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default"));
maxLifetime = -1;
defaultLifetime = -1;
// test for default values
cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100));
Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1));
Assert.assertEquals(0, cs.checkAndGetApplicationLifetime("default", 0));
Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default"));
maxLifetime = 10;
defaultLifetime = 10;
cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100));
Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1));
Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0));
Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default"));
maxLifetime = 0;
defaultLifetime = 0;
cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100));
Assert.assertEquals(-1, cs.checkAndGetApplicationLifetime("default", -1));
Assert.assertEquals(0, cs.checkAndGetApplicationLifetime("default", 0));
maxLifetime = 10;
defaultLifetime = -1;
cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100));
Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", -1));
Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 0));
maxLifetime = 5;
defaultLifetime = 10;
try {
setUpCSQueue(maxLifetime, defaultLifetime);
Assert.fail("Expected to fails since maxLifetime < defaultLifetime.");
} catch (YarnRuntimeException ye) {
Assert.assertTrue(
ye.getMessage().contains("can't exceed maximum lifetime"));
}
}
private CapacityScheduler setUpCSQueue(long maxLifetime,
long defaultLifetime) {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"default"});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".default", 100);
csConf.setMaximumLifetimePerQueue(
CapacitySchedulerConfiguration.ROOT + ".default", maxLifetime);
csConf.setDefaultLifetimePerQueue(
CapacitySchedulerConfiguration.ROOT + ".default", defaultLifetime);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
return cs;
}
private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
int timesec) throws InterruptedException { int timesec) throws InterruptedException {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();

View File

@ -171,6 +171,16 @@ Example:
</property> </property>
``` ```
* Queue lifetime for applications
The `CapacityScheduler` supports the following parameters to lifetime of an application:
| Property | Description |
|:---- |:---- |
| `yarn.scheduler.capacity.<queue-path>.maximum-application-lifetime` | Maximum lifetime of an application which is submitted to a queue in seconds. Any value less than or equal to zero will be considered as disabled. This will be a hard time limit for all applications in this queue. If positive value is configured then any application submitted to this queue will be killed after exceeds the configured lifetime. User can also specify lifetime per application basis in application submission context. But user lifetime will be overridden if it exceeds queue maximum lifetime. It is point-in-time configuration. Note : Configuring too low value will result in killing application sooner. This feature is applicable only for leaf queue. |
| `yarn.scheduler.capacity.root.<queue-path>.default-application-lifetime` | Default lifetime of an application which is submitted to a queue in seconds. Any value less than or equal to zero will be considered as disabled. If the user has not submitted application with lifetime value then this value will be taken. It is point-in-time configuration. Note : Default lifetime can't exceed maximum lifetime. This feature is applicable only for leaf queue.|
###Setup for application priority. ###Setup for application priority.
Application priority works only along with FIFO ordering policy. Default ordering policy is FIFO. Application priority works only along with FIFO ordering policy. Default ordering policy is FIFO.