YARN-2569. Added the log handling APIs for the long running services. Contributed by Xuan Gong.
(cherry picked from commit 5338ac416a
)
This commit is contained in:
parent
ac3b62dae7
commit
19cfe2f800
|
@ -64,6 +64,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-1250. Generic history service should support application-acls. (Zhijie Shen
|
||||
via junping_du)
|
||||
|
||||
YARN-2569. Added the log handling APIs for the long running services. (Xuan
|
||||
Gong via zjshen)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||
|
|
|
@ -54,6 +54,7 @@ import java.util.Set;
|
|||
* validityInterval into failure count. If failure count reaches to
|
||||
* maxAppAttempts, the application will be failed.
|
||||
* </li>
|
||||
* <li>Optional, application-specific {@link LogAggregationContext}</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
|
@ -128,6 +129,21 @@ public abstract class ApplicationSubmissionContext {
|
|||
return context;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public static ApplicationSubmissionContext newInstance(
|
||||
ApplicationId applicationId, String applicationName, String queue,
|
||||
Priority priority, ContainerLaunchContext amContainer,
|
||||
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
|
||||
int maxAppAttempts, Resource resource, String applicationType,
|
||||
boolean keepContainers, LogAggregationContext logAggregationContext) {
|
||||
ApplicationSubmissionContext context =
|
||||
newInstance(applicationId, applicationName, queue, priority,
|
||||
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
|
||||
resource, applicationType, keepContainers);
|
||||
context.setLogAggregationContext(logAggregationContext);
|
||||
return context;
|
||||
}
|
||||
/**
|
||||
* Get the <code>ApplicationId</code> of the submitted application.
|
||||
* @return <code>ApplicationId</code> of the submitted application
|
||||
|
@ -381,4 +397,24 @@ public abstract class ApplicationSubmissionContext {
|
|||
@Stable
|
||||
public abstract void setAttemptFailuresValidityInterval(
|
||||
long attemptFailuresValidityInterval);
|
||||
|
||||
/**
|
||||
* Get <code>LogAggregationContext</code> of the application
|
||||
*
|
||||
* @return <code>LogAggregationContext</code> of the application
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract LogAggregationContext getLogAggregationContext();
|
||||
|
||||
/**
|
||||
* Set <code>LogAggregationContext</code> for the application
|
||||
*
|
||||
* @param logAggregationContext
|
||||
* for the application
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setLogAggregationContext(
|
||||
LogAggregationContext logAggregationContext);
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p><code>LogAggregationContext</code> represents all of the
|
||||
* information needed by the <code>NodeManager</code> to handle
|
||||
* the logs for an application.</p>
|
||||
*
|
||||
* <p>It includes details such as:
|
||||
* <ul>
|
||||
* <li>includePattern. It uses Java Regex to filter the log files
|
||||
* which match the defined include pattern and those log files
|
||||
* will be uploaded. </li>
|
||||
* <li>excludePattern. It uses Java Regex to filter the log files
|
||||
* which match the defined exclude pattern and those log files
|
||||
* will not be uploaded. If the log file name matches both the
|
||||
* include and the exclude pattern, this file will be excluded eventually</li>
|
||||
* <li>rollingIntervalSeconds. The default value is -1. By default,
|
||||
* the logAggregationService only uploads container logs when
|
||||
* the application is finished. This configure defines
|
||||
* how often the logAggregationSerivce uploads container logs in seconds.
|
||||
* By setting this configure, the logAggregationSerivce can upload container
|
||||
* logs periodically when the application is running.
|
||||
* </li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
* @see ApplicationSubmissionContext
|
||||
*/
|
||||
|
||||
@Evolving
|
||||
@Public
|
||||
public abstract class LogAggregationContext {
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static LogAggregationContext newInstance(String includePattern,
|
||||
String excludePattern, long rollingIntervalSeconds) {
|
||||
LogAggregationContext context = Records.newRecord(LogAggregationContext.class);
|
||||
context.setIncludePattern(includePattern);
|
||||
context.setExcludePattern(excludePattern);
|
||||
context.setRollingIntervalSeconds(rollingIntervalSeconds);
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get include pattern
|
||||
*
|
||||
* @return include pattern
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getIncludePattern();
|
||||
|
||||
/**
|
||||
* Set include pattern
|
||||
*
|
||||
* @param includePattern
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setIncludePattern(String includePattern);
|
||||
|
||||
/**
|
||||
* Get exclude pattern
|
||||
*
|
||||
* @return exclude pattern
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getExcludePattern();
|
||||
|
||||
/**
|
||||
* Set exclude pattern
|
||||
*
|
||||
* @param excludePattern
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setExcludePattern(String excludePattern);
|
||||
|
||||
/**
|
||||
* Get rollingIntervalSeconds
|
||||
*
|
||||
* @return the rollingIntervalSeconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getRollingIntervalSeconds();
|
||||
|
||||
/**
|
||||
* Set rollingIntervalSeconds
|
||||
*
|
||||
* @param rollingIntervalSeconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds);
|
||||
}
|
|
@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
|
|||
optional bool keep_containers_across_application_attempts = 11 [default = false];
|
||||
repeated string applicationTags = 12;
|
||||
optional int64 attempt_failures_validity_interval = 13 [default = -1];
|
||||
optional LogAggregationContextProto log_aggregation_context = 14;
|
||||
}
|
||||
|
||||
message LogAggregationContextProto {
|
||||
optional string include_pattern = 1 [default = ".*"];
|
||||
optional string exclude_pattern = 2 [default = ""];
|
||||
optional int64 rolling_interval_seconds = 3 [default = -1];
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import com.google.common.base.CharMatcher;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
||||
|
@ -53,6 +56,7 @@ extends ApplicationSubmissionContext {
|
|||
private ContainerLaunchContext amContainer = null;
|
||||
private Resource resource = null;
|
||||
private Set<String> applicationTags = null;
|
||||
private LogAggregationContext logAggregationContext = null;
|
||||
|
||||
public ApplicationSubmissionContextPBImpl() {
|
||||
builder = ApplicationSubmissionContextProto.newBuilder();
|
||||
|
@ -110,6 +114,10 @@ extends ApplicationSubmissionContext {
|
|||
builder.clearApplicationTags();
|
||||
builder.addAllApplicationTags(this.applicationTags);
|
||||
}
|
||||
if (this.logAggregationContext != null) {
|
||||
builder.setLogAggregationContext(
|
||||
convertToProtoFormat(this.logAggregationContext));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -415,4 +423,36 @@ extends ApplicationSubmissionContext {
|
|||
maybeInitBuilder();
|
||||
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
||||
}
|
||||
|
||||
private LogAggregationContextPBImpl convertFromProtoFormat(
|
||||
LogAggregationContextProto p) {
|
||||
return new LogAggregationContextPBImpl(p);
|
||||
}
|
||||
|
||||
private LogAggregationContextProto convertToProtoFormat(
|
||||
LogAggregationContext t) {
|
||||
return ((LogAggregationContextPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationContext getLogAggregationContext() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.logAggregationContext != null) {
|
||||
return this.logAggregationContext;
|
||||
} // Else via proto
|
||||
if (!p.hasLogAggregationContext()) {
|
||||
return null;
|
||||
}
|
||||
logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext());
|
||||
return logAggregationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLogAggregationContext(
|
||||
LogAggregationContext logAggregationContext) {
|
||||
maybeInitBuilder();
|
||||
if (logAggregationContext == null)
|
||||
builder.clearLogAggregationContext();
|
||||
this.logAggregationContext = logAggregationContext;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
public class LogAggregationContextPBImpl extends LogAggregationContext{
|
||||
|
||||
LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance();
|
||||
LogAggregationContextProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public LogAggregationContextPBImpl() {
|
||||
builder = LogAggregationContextProto.newBuilder();
|
||||
}
|
||||
|
||||
public LogAggregationContextPBImpl(LogAggregationContextProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public LogAggregationContextProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = LogAggregationContextProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getIncludePattern() {
|
||||
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (! p.hasIncludePattern()) {
|
||||
return null;
|
||||
}
|
||||
return p.getIncludePattern();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIncludePattern(String includePattern) {
|
||||
maybeInitBuilder();
|
||||
if (includePattern == null) {
|
||||
builder.clearIncludePattern();
|
||||
return;
|
||||
}
|
||||
builder.setIncludePattern(includePattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExcludePattern() {
|
||||
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (! p.hasExcludePattern()) {
|
||||
return null;
|
||||
}
|
||||
return p.getExcludePattern();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExcludePattern(String excludePattern) {
|
||||
maybeInitBuilder();
|
||||
if (excludePattern == null) {
|
||||
builder.clearExcludePattern();
|
||||
return;
|
||||
}
|
||||
builder.setExcludePattern(excludePattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRollingIntervalSeconds() {
|
||||
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (! p.hasRollingIntervalSeconds()) {
|
||||
return -1;
|
||||
}
|
||||
return p.getRollingIntervalSeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRollingIntervalSeconds(long rollingIntervalSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setRollingIntervalSeconds(rollingIntervalSeconds);
|
||||
}
|
||||
}
|
|
@ -178,6 +178,7 @@ public class TestPBImplRecords {
|
|||
"http", "localhost", 8080, "file0"));
|
||||
typeValueCache.put(SerializedException.class,
|
||||
SerializedException.newInstance(new IOException("exception for test")));
|
||||
generateByNewInstance(LogAggregationContext.class);
|
||||
generateByNewInstance(ApplicationId.class);
|
||||
generateByNewInstance(ApplicationAttemptId.class);
|
||||
generateByNewInstance(ContainerId.class);
|
||||
|
|
Loading…
Reference in New Issue