YARN-1461. Added tags for YARN applications and changed RM to handle them. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564633 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-02-05 04:55:00 +00:00
parent 747cce814c
commit ebe0c17a95
27 changed files with 528 additions and 24 deletions

View File

@ -93,6 +93,9 @@ Release 2.4.0 - UNRELEASED
YARN-1634. Added a testable in-memory implementation of
ApplicationTimelineStore. (Zhijie Shen via vinodkv)
YARN-1461. Added tags for YARN applications and changed RM to handle them.
(Karthik Kambatla via zjshen)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Enumeration that controls the scope of applications fetched
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum ApplicationsRequestScope {
/** All jobs */
ALL,
/** Jobs viewable by current user */
VIEWABLE,
/** Jobs owned by current user */
OWN
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.EnumSet;
import java.util.Set;
import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -49,6 +48,86 @@ public abstract class GetApplicationsRequest {
return request;
}
/**
* <p>
* The request from clients to get a report of Applications matching the
* giving application types in the cluster from the
* <code>ResourceManager</code>.
* </p>
*
* @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
*
* <p>Setting any of the parameters to null, would just disable that
* filter</p>
*
* @param scope {@link ApplicationsRequestScope} to filter by
* @param users list of users to filter by
* @param queues list of scheduler queues to filter by
* @param applicationTypes types of applications
* @param applicationTags application tags to filter by
* @param applicationStates application states to filter by
* @param startRange range of application start times to filter by
* @param finishRange range of application finish times to filter by
* @param limit number of applications to limit to
* @return {@link GetApplicationsRequest} to be used with
* {@link ApplicationClientProtocol#getApplications(GetApplicationsRequest)}
*/
@Public
@Stable
public static GetApplicationsRequest newInstance(
ApplicationsRequestScope scope,
Set<String> users,
Set<String> queues,
Set<String> applicationTypes,
Set<String> applicationTags,
EnumSet<YarnApplicationState> applicationStates,
LongRange startRange,
LongRange finishRange,
Long limit) {
GetApplicationsRequest request =
Records.newRecord(GetApplicationsRequest.class);
if (scope != null) {
request.setScope(scope);
}
request.setUsers(users);
request.setQueues(queues);
request.setApplicationTypes(applicationTypes);
request.setApplicationTags(applicationTags);
request.setApplicationStates(applicationStates);
if (startRange != null) {
request.setStartRange(
startRange.getMinimumLong(), startRange.getMaximumLong());
}
if (finishRange != null) {
request.setFinishRange(
finishRange.getMinimumLong(), finishRange.getMaximumLong());
}
if (limit != null) {
request.setLimit(limit);
}
return request;
}
/**
* <p>
* The request from clients to get a report of Applications matching the
* giving application types in the cluster from the
* <code>ResourceManager</code>.
* </p>
*
* @param scope {@link ApplicationsRequestScope} to filter by
* @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
*/
@Public
@Stable
public static GetApplicationsRequest newInstance(
ApplicationsRequestScope scope) {
GetApplicationsRequest request =
Records.newRecord(GetApplicationsRequest.class);
request.setScope(scope);
return request;
}
/**
* <p>
* The request from clients to get a report of Applications matching the
@ -257,4 +336,40 @@ public abstract class GetApplicationsRequest {
@Private
@Unstable
public abstract void setFinishRange(long begin, long end);
/**
* Get the tags to filter applications on
*
* @return list of tags to filter on
*/
@Private
@Unstable
public abstract Set<String> getApplicationTags();
/**
* Set the list of tags to filter applications on
*
* @param tags list of tags to filter on
*/
@Private
@Unstable
public abstract void setApplicationTags(Set<String> tags);
/**
* Get the {@link ApplicationsRequestScope} of applications to be filtered.
*
* @return {@link ApplicationsRequestScope} of applications to return.
*/
@Private
@Unstable
public abstract ApplicationsRequestScope getScope();
/**
* Set the {@link ApplicationsRequestScope} of applications to filter.
*
* @param scope scope to use for filtering applications
*/
@Private
@Unstable
public abstract void setScope(ApplicationsRequestScope scope);
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records;
import java.util.Set;
/**
* <p><code>ApplicationReport</code> is a report of an application.</p>
*
@ -321,6 +323,18 @@ public abstract class ApplicationReport {
@Unstable
public abstract void setApplicationType(String applicationType);
/**
* Get all tags corresponding to the application
* @return Application's tags
*/
@Public
@Stable
public abstract Set<String> getApplicationTags();
@Private
@Unstable
public abstract void setApplicationTags(Set<String> tags);
@Private
@Stable
public abstract void setAMRMToken(Token amRmToken);

View File

@ -25,8 +25,11 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import java.util.Set;
/**
* <p><code>ApplicationSubmissionContext</code> represents all of the
* information needed by the <code>ResourceManager</code> to launch
@ -284,7 +287,6 @@ public abstract class ApplicationSubmissionContext {
@Stable
public abstract void setApplicationType(String applicationType);
/**
* Get the flag which indicates whether to keep containers across application
* attempts or not.
@ -314,4 +316,26 @@ public abstract class ApplicationSubmissionContext {
@Stable
public abstract void setKeepContainersAcrossApplicationAttempts(
boolean keepContainers);
/**
* Get tags for the application
*
* @return the application tags
*/
@Public
@Stable
public abstract Set<String> getApplicationTags();
/**
* Set tags for the application. A maximum of
* {@link YarnConfiguration#APPLICATION_MAX_TAGS} are allowed
* per application. Each tag can be at most
* {@link YarnConfiguration#APPLICATION_MAX_TAG_LENGTH}
* characters, and can contain only ASCII characters.
*
* @param tags tags to set
*/
@Public
@Stable
public abstract void setApplicationTags(Set<String> tags);
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -50,6 +51,12 @@ public class YarnConfiguration extends Configuration {
@Private
public static final String CORE_SITE_CONFIGURATION_FILE = "core-site.xml";
@Evolving
public static final int APPLICATION_MAX_TAGS = 10;
@Evolving
public static final int APPLICATION_MAX_TAG_LENGTH = 100;
private static final String YARN_DEFAULT_XML_FILE = "yarn-default.xml";
static {

View File

@ -190,6 +190,7 @@ message ApplicationReportProto {
optional float progress = 17;
optional string applicationType = 18;
optional hadoop.common.TokenProto am_rm_token = 19;
repeated string applicationTags = 20;
}
message ApplicationAttemptReportProto {
@ -287,6 +288,7 @@ message ApplicationSubmissionContextProto {
optional ResourceProto resource = 9;
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
}
enum ApplicationAccessTypeProto {

View File

@ -136,6 +136,12 @@ message MoveApplicationAcrossQueuesRequestProto {
message MoveApplicationAcrossQueuesResponseProto {
}
enum ApplicationsRequestScopeProto {
ALL = 0;
VIEWABLE = 1;
OWN = 2;
}
message GetApplicationsRequestProto {
repeated string application_types = 1;
repeated YarnApplicationStateProto application_states = 2;
@ -146,6 +152,8 @@ message GetApplicationsRequestProto {
optional int64 start_end = 7;
optional int64 finish_begin = 8;
optional int64 finish_end = 9;
repeated string applicationTags = 10;
optional ApplicationsRequestScopeProto scope = 11 [default = ALL];
}
message GetApplicationsResponseProto {

View File

@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
@ -49,6 +50,8 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
Set<String> queues = null;
long limit = Long.MAX_VALUE;
LongRange start = null, finish = null;
private Set<String> applicationTags;
private ApplicationsRequestScope scope;
public GetApplicationsRequestPBImpl() {
builder = GetApplicationsRequestProto.newBuilder();
@ -112,6 +115,12 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
};
builder.addAllApplicationStates(iterable);
}
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
if (this.scope != null) {
builder.setScope(ProtoUtils.convertToProtoFormat(scope));
}
}
private void addLocalApplicationTypesToProto() {
@ -187,12 +196,64 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
this.applicationTypes = applicationTypes;
}
private void initApplicationTags() {
if (this.applicationTags != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
this.applicationTags = new HashSet<String>();
this.applicationTags.addAll(p.getApplicationTagsList());
}
@Override
public Set<String> getApplicationTags() {
initApplicationTags();
return this.applicationTags;
}
@Override
public void setApplicationTags(Set<String> tags) {
maybeInitBuilder();
if (tags == null || tags.isEmpty()) {
builder.clearApplicationTags();
this.applicationTags = null;
return;
}
// Convert applicationTags to lower case and add
this.applicationTags = new HashSet<String>();
for (String tag : tags) {
this.applicationTags.add(tag.toLowerCase());
}
}
@Override
public EnumSet<YarnApplicationState> getApplicationStates() {
initApplicationStates();
return this.applicationStates;
}
private void initScope() {
if (this.scope != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
this.scope = ProtoUtils.convertFromProtoFormat(p.getScope());
}
@Override
public ApplicationsRequestScope getScope() {
initScope();
return this.scope;
}
public void setScope(ApplicationsRequestScope scope) {
maybeInitBuilder();
if (scope == null) {
builder.clearScope();
}
this.scope = scope;
}
@Override
public void setApplicationStates(EnumSet<YarnApplicationState> applicationStates) {
maybeInitBuilder();
@ -223,7 +284,6 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
return this.users;
}
@Override
public void setUsers(Set<String> users) {
maybeInitBuilder();
if (users == null) {

View File

@ -38,6 +38,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.TextFormat;
import java.util.HashSet;
import java.util.Set;
@Private
@Unstable
public class ApplicationReportPBImpl extends ApplicationReport {
@ -49,6 +52,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
private ApplicationAttemptId currentApplicationAttemptId;
private Token clientToAMToken = null;
private Token amRmToken = null;
private Set<String> applicationTags = null;
public ApplicationReportPBImpl() {
builder = ApplicationReportProto.newBuilder();
@ -245,6 +249,21 @@ public class ApplicationReportPBImpl extends ApplicationReport {
return amRmToken;
}
private void initApplicationTags() {
if (this.applicationTags != null) {
return;
}
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
this.applicationTags = new HashSet<String>();
this.applicationTags.addAll(p.getApplicationTagsList());
}
@Override
public Set<String> getApplicationTags() {
initApplicationTags();
return this.applicationTags;
}
@Override
public void setApplicationId(ApplicationId applicationId) {
maybeInitBuilder();
@ -355,6 +374,15 @@ public class ApplicationReportPBImpl extends ApplicationReport {
builder.setApplicationType((applicationType));
}
@Override
public void setApplicationTags(Set<String> tags) {
maybeInitBuilder();
if (tags == null || tags.isEmpty()) {
builder.clearApplicationTags();
}
this.applicationTags = tags;
}
@Override
public void setDiagnostics(String diagnostics) {
maybeInitBuilder();
@ -450,6 +478,9 @@ public class ApplicationReportPBImpl extends ApplicationReport {
builder.getAmRmToken())) {
builder.setAmRmToken(convertToProtoFormat(this.amRmToken));
}
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
}
private void mergeLocalToProto() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -25,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
@ -34,6 +36,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import com.google.protobuf.TextFormat;
import java.util.HashSet;
import java.util.Set;
@Private
@Unstable
public class ApplicationSubmissionContextPBImpl
@ -47,6 +52,7 @@ extends ApplicationSubmissionContext {
private Priority priority = null;
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set<String> applicationTags = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@ -100,6 +106,9 @@ extends ApplicationSubmissionContext {
builder.getResource())) {
builder.setResource(convertToProtoFormat(this.resource));
}
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
}
private void mergeLocalToProto() {
@ -197,6 +206,21 @@ extends ApplicationSubmissionContext {
return (p.getApplicationType());
}
private void initApplicationTags() {
if (this.applicationTags != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
this.applicationTags = new HashSet<String>();
this.applicationTags.addAll(p.getApplicationTagsList());
}
@Override
public Set<String> getApplicationTags() {
initApplicationTags();
return this.applicationTags;
}
@Override
public void setQueue(String queue) {
maybeInitBuilder();
@ -217,6 +241,40 @@ extends ApplicationSubmissionContext {
builder.setApplicationType((applicationType));
}
private void checkTags(Set<String> tags) {
if (tags.size() > YarnConfiguration.APPLICATION_MAX_TAGS) {
throw new IllegalArgumentException("Too many applicationTags, a maximum of only "
+ YarnConfiguration.APPLICATION_MAX_TAGS + " are allowed!");
}
for (String tag : tags) {
if (tag.length() > YarnConfiguration.APPLICATION_MAX_TAG_LENGTH) {
throw new IllegalArgumentException("Tag " + tag + " is too long, " +
"maximum allowed length of a tag is " +
YarnConfiguration.APPLICATION_MAX_TAG_LENGTH);
}
if (!CharMatcher.ASCII.matchesAllOf(tag)) {
throw new IllegalArgumentException("A tag can only have ASCII " +
"characters! Invalid tag - " + tag);
}
}
}
@Override
public void setApplicationTags(Set<String> tags) {
maybeInitBuilder();
if (tags == null || tags.isEmpty()) {
builder.clearApplicationTags();
this.applicationTags = null;
return;
}
checkTags(tags);
// Convert applicationTags to lower case and add
this.applicationTags = new HashSet<String>();
for (String tag : tags) {
this.applicationTags.add(tag.toLowerCase());
}
}
@Override
public ContainerLaunchContext getAMContainerSpec() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@Private
@Unstable
@ -113,6 +115,18 @@ public class ProtoUtils {
YARN_APPLICATION_ATTEMPT_STATE_PREFIX, ""));
}
/*
* ApplicationsRequestScope
*/
public static YarnServiceProtos.ApplicationsRequestScopeProto
convertToProtoFormat(ApplicationsRequestScope e) {
return YarnServiceProtos.ApplicationsRequestScopeProto.valueOf(e.name());
}
public static ApplicationsRequestScope convertFromProtoFormat
(YarnServiceProtos.ApplicationsRequestScopeProto e) {
return ApplicationsRequestScope.valueOf(e.name());
}
/*
* ApplicationResourceUsageReport
*/

View File

@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
@ -312,7 +313,7 @@ public class BuilderUtils {
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
float progress, String appType, Token amRmToken) {
float progress, String appType, Token amRmToken, Set<String> tags) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@ -334,6 +335,7 @@ public class BuilderUtils {
report.setProgress(progress);
report.setApplicationType(appType);
report.setAMRMToken(amRmToken);
report.setApplicationTags(tags);
return report;
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -445,9 +446,11 @@ public class ClientRMService extends AbstractService implements
request.getApplicationStates();
Set<String> users = request.getUsers();
Set<String> queues = request.getQueues();
Set<String> tags = request.getApplicationTags();
long limit = request.getLimit();
LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange();
ApplicationsRequestScope scope = request.getScope();
final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
Iterator<RMApp> appsIter;
@ -494,6 +497,17 @@ public class ClientRMService extends AbstractService implements
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
while (appsIter.hasNext() && reports.size() < limit) {
RMApp application = appsIter.next();
// Check if current application falls under the specified scope
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
if (scope == ApplicationsRequestScope.OWN &&
!callerUGI.getUserName().equals(application.getUser())) {
continue;
} else if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
continue;
}
if (applicationTypes != null && !applicationTypes.isEmpty()) {
String appTypeToMatch = caseSensitive
? application.getApplicationType()
@ -523,8 +537,23 @@ public class ClientRMService extends AbstractService implements
continue;
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
if (tags != null && !tags.isEmpty()) {
Set<String> appTags = application.getApplicationTags();
if (appTags == null || appTags.isEmpty()) {
continue;
}
boolean match = false;
for (String tag : tags) {
if (appTags.contains(tag)) {
match = true;
break;
}
}
if (!match) {
continue;
}
}
reports.add(application.createAndGetApplicationReport(
callerUGI.getUserName(), allowAccess));
}

View File

@ -320,7 +320,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType());
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags());
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -196,6 +197,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/
String getApplicationType();
/**
* Get tags for the application
* @return tags corresponding to the application
*/
Set<String> getApplicationTags();
/**
* Check whether this application is safe to terminate.
* An application is deemed to be safe to terminate if it is an unmanaged

View File

@ -104,6 +104,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final String applicationType;
private final Set<String> applicationTags;
// Mutable fields
private long startTime;
@ -302,9 +303,9 @@ public class RMAppImpl implements RMApp, Recoverable {
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext,
YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime, String applicationType) {
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime,
String applicationType, Set<String> applicationTags) {
this.applicationId = applicationId;
this.name = name;
@ -320,6 +321,7 @@ public class RMAppImpl implements RMApp, Recoverable {
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.applicationType = applicationType;
this.applicationTags = applicationTags;
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -553,7 +555,7 @@ public class RMAppImpl implements RMApp, Recoverable {
createApplicationState(), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
appUsageReport, origTrackingUrl, progress, this.applicationType,
amrmToken);
amrmToken, applicationTags);
} finally {
this.readLock.unlock();
}
@ -1085,6 +1087,11 @@ public class RMAppImpl implements RMApp, Recoverable {
return this.applicationType;
}
@Override
public Set<String> getApplicationTags() {
return this.applicationTags;
}
@Override
public boolean isAppSafeToTerminate() {
RMAppState state = getState();

View File

@ -110,6 +110,7 @@ public class AppBlock extends HtmlBlock {
_("User:", app.getUser()).
_("Name:", app.getName()).
_("Application Type:", app.getApplicationType()).
_("Application Tags:", app.getApplicationTags()).
_("State:", app.getState()).
_("FinalStatus:", app.getFinalStatus()).
_("Started:", Times.format(app.getStartTime())).

View File

@ -261,12 +261,14 @@ public class RMWebServices {
@QueryParam("startedTimeEnd") String startedEnd,
@QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd,
@QueryParam("applicationTypes") Set<String> applicationTypes) {
@QueryParam("applicationTypes") Set<String> applicationTypes,
@QueryParam("applicationTags") Set<String> applicationTags) {
boolean checkCount = false;
boolean checkStart = false;
boolean checkEnd = false;
boolean checkAppTypes = false;
boolean checkAppStates = false;
boolean checkAppTags = false;
long countNum = 0;
// set values suitable in case both of begin/end not specified
@ -327,6 +329,11 @@ public class RMWebServices {
checkAppTypes = true;
}
Set<String> appTags = parseQueries(applicationTags, false);
if (!appTags.isEmpty()) {
checkAppTags = true;
}
// stateQuery is deprecated.
if (stateQuery != null && !stateQuery.isEmpty()) {
statesQuery.add(stateQuery);
@ -354,6 +361,10 @@ public class RMWebServices {
request.setApplicationTypes(appTypes);
}
if (checkAppTags) {
request.setApplicationTags(appTags);
}
if (checkAppStates) {
request.setApplicationStates(appStates);
}

View File

@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import com.google.common.base.Joiner;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -33,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
@ -67,6 +67,7 @@ public class AppInfo {
protected String diagnostics;
protected long clusterId;
protected String applicationType;
protected String applicationTags = "";
// these are only allowed if acls allow
protected long startedTime;
@ -117,6 +118,9 @@ public class AppInfo {
if (diagnostics == null || diagnostics.isEmpty()) {
this.diagnostics = "";
}
if (app.getApplicationTags() != null && !app.getApplicationTags().isEmpty()) {
this.applicationTags = Joiner.on(',').join(app.getApplicationTags());
}
this.finalStatus = app.getFinalApplicationStatus();
this.clusterId = ResourceManager.getClusterTimeStamp();
if (hasAccess) {
@ -240,6 +244,10 @@ public class AppInfo {
return this.applicationType;
}
public String getApplicationTags() {
return this.applicationTags;
}
public int getRunningContainers() {
return this.runningContainers;
}

View File

@ -41,8 +41,10 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import com.google.common.collect.Sets;
import junit.framework.Assert;
import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -51,6 +53,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
@ -465,6 +469,7 @@ public class TestClientRMService {
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
// Submit applications
for (int i = 0; i < appIds.length; i++) {
@ -472,7 +477,8 @@ public class TestClientRMService {
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true);
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
appId, appNames[i], queues[i % queues.length]);
appId, appNames[i], queues[i % queues.length],
new HashSet<String>(tags.subList(0, i + 1)));
rmService.submitApplication(submitRequest);
}
@ -513,6 +519,41 @@ public class TestClientRMService {
userSet.add(UserGroupInformation.getCurrentUser().getShortUserName());
assertEquals("Incorrect number of applications for user", 3,
rmService.getApplications(request).getApplicationList().size());
// Check tags
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.ALL, null, null, null, null, null, null,
null, null);
Set<String> tagSet = new HashSet<String>();
request.setApplicationTags(tagSet);
assertEquals("Incorrect number of matching tags", 6,
rmService.getApplications(request).getApplicationList().size());
tagSet = Sets.newHashSet(tags.get(0));
request.setApplicationTags(tagSet);
assertEquals("Incorrect number of matching tags", 3,
rmService.getApplications(request).getApplicationList().size());
tagSet = Sets.newHashSet(tags.get(1));
request.setApplicationTags(tagSet);
assertEquals("Incorrect number of matching tags", 2,
rmService.getApplications(request).getApplicationList().size());
tagSet = Sets.newHashSet(tags.get(2));
request.setApplicationTags(tagSet);
assertEquals("Incorrect number of matching tags", 1,
rmService.getApplications(request).getApplicationList().size());
// Check scope
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.VIEWABLE);
assertEquals("Incorrect number of applications for the scope", 6,
rmService.getApplications(request).getApplicationList().size());
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.OWN);
assertEquals("Incorrect number of applications for the scope", 3,
rmService.getApplications(request).getApplicationList().size());
}
@Test(timeout=4000)
@ -583,6 +624,11 @@ public class TestClientRMService {
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue) {
return mockSubmitAppRequest(appId, name, queue, null);
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue, Set<String> tags) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
@ -596,6 +642,7 @@ public class TestClientRMService {
submissionContext.setApplicationId(appId);
submissionContext.setResource(resource);
submissionContext.setApplicationType(appType);
submissionContext.setApplicationTags(tags);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@ -664,7 +711,7 @@ public class TestClientRMService {
when(asContext.getMaxAppAttempts()).thenReturn(1);
RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis(), "YARN"));
.currentTimeMillis(), "YARN", null));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
rmContext, yarnScheduler, null, asContext, config, false);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.MockApps;
@ -139,6 +140,11 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Set<String> getApplicationTags() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setQueue(String name) {
throw new UnsupportedOperationException("Not supported yet.");
@ -236,6 +242,10 @@ public abstract class MockAsm extends MockApps {
return maxAppAttempts;
}
@Override
public Set<String> getApplicationTags() {
return null;
}
};
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -217,6 +218,11 @@ public class MockRMApp implements RMApp {
return YarnConfiguration.DEFAULT_APPLICATION_TYPE;
}
@Override
public Set<String> getApplicationTags() {
return null;
}
@Override
public boolean isAppSafeToTerminate() {
return true;

View File

@ -230,7 +230,7 @@ public class TestRMAppTransitions {
RMApp application =
new RMAppImpl(applicationId, rmContext, conf, name, user, queue,
submissionContext, scheduler, masterService,
System.currentTimeMillis(), "YARN");
System.currentTimeMillis(), "YARN", null);
testAppStartState(applicationId, user, name, queue, application);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),

View File

@ -621,7 +621,7 @@ public class TestFairScheduler {
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null);
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
@ -647,7 +647,7 @@ public class TestFairScheduler {
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null);
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
@ -1765,7 +1765,7 @@ public class TestFairScheduler {
RMApp application =
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
queue, submissionContext, scheduler, masterService,
System.currentTimeMillis(), "YARN");
System.currentTimeMillis(), "YARN", null);
resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
application.handle(new RMAppEvent(applicationId, RMAppEventType.START));

View File

@ -1317,8 +1317,8 @@ public class TestRMWebServicesApps extends JerseyTest {
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
Exception {
// 15 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 19, info.length());
// 20 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 20, info.length());
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
info.getString("name"), info.getString("applicationType"), info.getString("queue"),

View File

@ -1123,6 +1123,7 @@ ResourceManager REST API's.
* finishedTimeBegin - applications with finish time beginning with this time, specified in ms since epoch
* finishedTimeEnd - applications with finish time ending with this time, specified in ms since epoch
* applicationTypes - applications matching the given application types, specified as a comma-separated list.
* applicationTags - applications matching any of the given application tags, specified as a comma-separated list.
------
** Elements of the <apps> (Applications) object