YARN-2569. Added the log handling APIs for the long running services. Contributed by Xuan Gong.

This commit is contained in:
Zhijie Shen 2014-09-23 10:36:57 -07:00
parent a1fd804a31
commit 5338ac416a
7 changed files with 342 additions and 0 deletions

View File

@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED
YARN-1250. Generic history service should support application-acls. (Zhijie Shen
via junping_du)
YARN-2569. Added the log handling APIs for the long running services. (Xuan
Gong via zjshen)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -54,6 +54,7 @@ import java.util.Set;
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
* </li>
* <li>Optional, application-specific {@link LogAggregationContext}</li>
* </ul>
* </p>
*
@ -128,6 +129,21 @@ public abstract class ApplicationSubmissionContext {
return context;
}
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers, LogAggregationContext logAggregationContext) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, keepContainers);
context.setLogAggregationContext(logAggregationContext);
return context;
}
/**
* Get the <code>ApplicationId</code> of the submitted application.
* @return <code>ApplicationId</code> of the submitted application
@ -381,4 +397,24 @@ public abstract class ApplicationSubmissionContext {
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
/**
* Get <code>LogAggregationContext</code> of the application
*
* @return <code>LogAggregationContext</code> of the application
*/
@Public
@Stable
public abstract LogAggregationContext getLogAggregationContext();
/**
* Set <code>LogAggregationContext</code> for the application
*
* @param logAggregationContext
* for the application
*/
@Public
@Stable
public abstract void setLogAggregationContext(
LogAggregationContext logAggregationContext);
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p><code>LogAggregationContext</code> represents all of the
* information needed by the <code>NodeManager</code> to handle
* the logs for an application.</p>
*
* <p>It includes details such as:
* <ul>
* <li>includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
* will be uploaded. </li>
* <li>excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
* will not be uploaded. If the log file name matches both the
* include and the exclude pattern, this file will be excluded eventually</li>
* <li>rollingIntervalSeconds. The default value is -1. By default,
* the logAggregationService only uploads container logs when
* the application is finished. This configure defines
* how often the logAggregationSerivce uploads container logs in seconds.
* By setting this configure, the logAggregationSerivce can upload container
* logs periodically when the application is running.
* </li>
* </ul>
* </p>
*
* @see ApplicationSubmissionContext
*/
@Evolving
@Public
public abstract class LogAggregationContext {
@Public
@Unstable
public static LogAggregationContext newInstance(String includePattern,
String excludePattern, long rollingIntervalSeconds) {
LogAggregationContext context = Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern);
context.setRollingIntervalSeconds(rollingIntervalSeconds);
return context;
}
/**
* Get include pattern
*
* @return include pattern
*/
@Public
@Unstable
public abstract String getIncludePattern();
/**
* Set include pattern
*
* @param includePattern
*/
@Public
@Unstable
public abstract void setIncludePattern(String includePattern);
/**
* Get exclude pattern
*
* @return exclude pattern
*/
@Public
@Unstable
public abstract String getExcludePattern();
/**
* Set exclude pattern
*
* @param excludePattern
*/
@Public
@Unstable
public abstract void setExcludePattern(String excludePattern);
/**
* Get rollingIntervalSeconds
*
* @return the rollingIntervalSeconds
*/
@Public
@Unstable
public abstract long getRollingIntervalSeconds();
/**
* Set rollingIntervalSeconds
*
* @param rollingIntervalSeconds
*/
@Public
@Unstable
public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds);
}

View File

@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
optional LogAggregationContextProto log_aggregation_context = 14;
}
message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""];
optional int64 rolling_interval_seconds = 3 [default = -1];
}
enum ApplicationAccessTypeProto {

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -53,6 +56,7 @@ extends ApplicationSubmissionContext {
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set<String> applicationTags = null;
private LogAggregationContext logAggregationContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@ -110,6 +114,10 @@ extends ApplicationSubmissionContext {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
if (this.logAggregationContext != null) {
builder.setLogAggregationContext(
convertToProtoFormat(this.logAggregationContext));
}
}
private void mergeLocalToProto() {
@ -415,4 +423,36 @@ extends ApplicationSubmissionContext {
maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
private LogAggregationContextPBImpl convertFromProtoFormat(
LogAggregationContextProto p) {
return new LogAggregationContextPBImpl(p);
}
private LogAggregationContextProto convertToProtoFormat(
LogAggregationContext t) {
return ((LogAggregationContextPBImpl) t).getProto();
}
@Override
public LogAggregationContext getLogAggregationContext() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.logAggregationContext != null) {
return this.logAggregationContext;
} // Else via proto
if (!p.hasLogAggregationContext()) {
return null;
}
logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext());
return logAggregationContext;
}
@Override
public void setLogAggregationContext(
LogAggregationContext logAggregationContext) {
maybeInitBuilder();
if (logAggregationContext == null)
builder.clearLogAggregationContext();
this.logAggregationContext = logAggregationContext;
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
import com.google.protobuf.TextFormat;
public class LogAggregationContextPBImpl extends LogAggregationContext{
LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance();
LogAggregationContextProto.Builder builder = null;
boolean viaProto = false;
public LogAggregationContextPBImpl() {
builder = LogAggregationContextProto.newBuilder();
}
public LogAggregationContextPBImpl(LogAggregationContextProto proto) {
this.proto = proto;
viaProto = true;
}
public LogAggregationContextProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = LogAggregationContextProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String getIncludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasIncludePattern()) {
return null;
}
return p.getIncludePattern();
}
@Override
public void setIncludePattern(String includePattern) {
maybeInitBuilder();
if (includePattern == null) {
builder.clearIncludePattern();
return;
}
builder.setIncludePattern(includePattern);
}
@Override
public String getExcludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasExcludePattern()) {
return null;
}
return p.getExcludePattern();
}
@Override
public void setExcludePattern(String excludePattern) {
maybeInitBuilder();
if (excludePattern == null) {
builder.clearExcludePattern();
return;
}
builder.setExcludePattern(excludePattern);
}
@Override
public long getRollingIntervalSeconds() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRollingIntervalSeconds()) {
return -1;
}
return p.getRollingIntervalSeconds();
}
@Override
public void setRollingIntervalSeconds(long rollingIntervalSeconds) {
maybeInitBuilder();
builder.setRollingIntervalSeconds(rollingIntervalSeconds);
}
}

View File

@ -178,6 +178,7 @@ public class TestPBImplRecords {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);