YARN-221. NM should provide a way for AM to tell it not to aggregate

logs. Contributed by Ming Ma
This commit is contained in:
Xuan 2015-08-22 16:25:24 -07:00
parent 490bb5ebd6
commit 37e1c3d82a
26 changed files with 1344 additions and 172 deletions

View File

@ -315,7 +315,18 @@ public class StringUtils {
* @return the arraylist of the comma seperated string values
*/
public static String[] getStrings(String str){
Collection<String> values = getStringCollection(str);
String delim = ",";
return getStrings(str, delim);
}
/**
* Returns an arraylist of strings.
* @param str the string values
* @param delim delimiter to separate the values
* @return the arraylist of the seperated string values
*/
public static String[] getStrings(String str, String delim){
Collection<String> values = getStringCollection(str, delim);
if(values.size() == 0) {
return null;
}

View File

@ -178,6 +178,9 @@ Release 2.8.0 - UNRELEASED
YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed
Node Label Configuration Setup. (Naganarasimha G R)
YARN-221. NM should provide a way for AM to tell it not to aggregate logs.
(Ming Ma via xgong)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -54,6 +54,43 @@ import org.apache.hadoop.yarn.util.Records;
* name matches both the include and the exclude pattern, this file
* will be excluded eventually.
* </li>
* <li>
* policyClassName. The policy class name that implements
* ContainerLogAggregationPolicy. At runtime, nodemanager will the policy
* if a given container's log should be aggregated based on the
* ContainerType and other runtime state such as exit code by calling
* ContainerLogAggregationPolicy#shouldDoLogAggregation.
* This is useful when the app only wants to aggregate logs of a subset of
* containers. Here are the available policies. Please make sure to specify
* the canonical name by prefixing org.apache.hadoop.yarn.server.
* nodemanager.containermanager.logaggregation.
* to the class simple name below.
* NoneContainerLogAggregationPolicy: skip aggregation for all containers.
* AllContainerLogAggregationPolicy: aggregate all containers.
* AMOrFailedContainerLogAggregationPolicy: aggregate application master
* or failed containers.
* FailedOrKilledContainerLogAggregationPolicy: aggregate failed or killed
* containers
* FailedContainerLogAggregationPolicy: aggregate failed containers
* AMOnlyLogAggregationPolicy: aggregate application master containers
* SampleContainerLogAggregationPolicy: sample logs of successful worker
* containers, in addition to application master and failed/killed
* containers.
* If it isn't specified, it will use the cluster-wide default policy
* defined by configuration yarn.nodemanager.log-aggregation.policy.class.
* The default value of yarn.nodemanager.log-aggregation.policy.class is
* AllContainerLogAggregationPolicy.
* </li>
* <li>
* policyParameters. The parameters passed to the policy class via
* ContainerLogAggregationPolicy#parseParameters during the policy object
* initialization. This is optional. Some policy class might use parameters
* to adjust its settings. It is up to policy class to define the scheme of
* parameters.
* For example, SampleContainerLogAggregationPolicy supports the format of
* "SR:0.5,MIN:50", which means sample rate of 50% beyond the first 50
* successful worker containers.
* </li>
* </ul>
*
* @see ApplicationSubmissionContext
@ -87,6 +124,23 @@ public abstract class LogAggregationContext {
return context;
}
@Public
@Unstable
public static LogAggregationContext newInstance(String includePattern,
String excludePattern, String rolledLogsIncludePattern,
String rolledLogsExcludePattern, String policyClassName,
String policyParameters) {
LogAggregationContext context =
Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern);
context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
context.setLogAggregationPolicyClassName(policyClassName);
context.setLogAggregationPolicyParameters(policyParameters);
return context;
}
/**
* Get include pattern. This includePattern only takes affect
* on logs that exist at the time of application finish.
@ -164,4 +218,45 @@ public abstract class LogAggregationContext {
@Unstable
public abstract void setRolledLogsExcludePattern(
String rolledLogsExcludePattern);
/**
* Get the log aggregation policy class.
*
* @return log aggregation policy class
*/
@Public
@Unstable
public abstract String getLogAggregationPolicyClassName();
/**
* Set the log aggregation policy class.
*
* @param className
*/
@Public
@Unstable
public abstract void setLogAggregationPolicyClassName(
String className);
/**
* Get the log aggregation policy parameters.
*
* @return log aggregation policy parameters
*/
@Public
@Unstable
public abstract String getLogAggregationPolicyParameters();
/**
* Set the log aggregation policy parameters.
* There is no schema defined for the parameters string.
* It is up to the log aggregation policy class to decide how to parse
* the parameters string.
*
* @param parameters
*/
@Public
@Unstable
public abstract void setLogAggregationPolicyParameters(
String parameters);
}

View File

@ -1237,6 +1237,12 @@ public class YarnConfiguration extends Configuration {
NM_RECOVERY_PREFIX + "supervised";
public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
public static final String NM_LOG_AGG_POLICY_CLASS =
NM_PREFIX + "log-aggregation.policy.class";
public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX
+ "log-aggregation.policy.parameters";
////////////////////////////////
// Web Proxy Configs
////////////////////////////////

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This API is used by NodeManager to decide if a given container's logs
* should be aggregated at run time.
*/
@Public
@Unstable
public interface ContainerLogAggregationPolicy {
/**
* <p>
* The method used by the NodeManager log aggregation service
* to initial the policy object with parameters specified by the application
* or the cluster-wide setting.
* </p>
*
* @param parameters parameters with scheme defined by the policy class.
*/
void parseParameters(String parameters);
/**
* <p>
* The method used by the NodeManager log aggregation service
* to ask the policy object if a given container's logs should be aggregated.
* </p>
*
* @param logContext ContainerLogContext
* @return Whether or not the container's logs should be aggregated.
*/
boolean shouldDoLogAggregation(ContainerLogContext logContext);
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Context class for {@link ContainerLogAggregationPolicy}.
*/
@Public
@Unstable
public class ContainerLogContext {
private final ContainerId containerId;
private final ContainerType containerType;
private int exitCode;
@Public
@Unstable
public ContainerLogContext(ContainerId containerId,
ContainerType containerType, int exitCode) {
this.containerId = containerId;
this.containerType = containerType;
this.exitCode = exitCode;
}
/**
* Get {@link ContainerId} of the container.
*
* @return the container ID
*/
public ContainerId getContainerId() {
return containerId;
}
/**
* Get {@link ContainerType} the type of the container.
*
* @return the type of the container
*/
public ContainerType getContainerType() {
return containerType;
}
/**
* Get the exit code of the container.
*
* @return the exit code
*/
public int getExitCode() {
return exitCode;
}
}

View File

@ -343,6 +343,8 @@ message LogAggregationContextProto {
optional string exclude_pattern = 2 [default = ""];
optional string rolled_logs_include_pattern = 3 [default = ""];
optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
optional string log_aggregation_policy_class_name = 5;
optional string log_aggregation_policy_parameters = 6;
}
enum ApplicationAccessTypeProto {

View File

@ -155,4 +155,44 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
}
builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
}
@Override
public String getLogAggregationPolicyClassName() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasLogAggregationPolicyClassName()) {
return null;
}
return p.getLogAggregationPolicyClassName();
}
@Override
public void setLogAggregationPolicyClassName(
String className) {
maybeInitBuilder();
if (className == null) {
builder.clearLogAggregationPolicyClassName();
return;
}
builder.setLogAggregationPolicyClassName(className);
}
@Override
public String getLogAggregationPolicyParameters() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasLogAggregationPolicyParameters()) {
return null;
}
return p.getLogAggregationPolicyParameters();
}
@Override
public void setLogAggregationPolicyParameters(
String config) {
maybeInitBuilder();
if (config == null) {
builder.clearLogAggregationPolicyParameters();
return;
}
builder.setLogAggregationPolicyParameters(config);
}
}

View File

@ -16,14 +16,15 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@Private
/**
* This API is not exposed to end-users yet.
*/
public enum ContainerLogsRetentionPolicy {
APPLICATION_MASTER_ONLY, AM_AND_FAILED_CONTAINERS_ONLY, ALL_CONTAINERS
}
public class AllContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
return true;
}
}

View File

@ -2226,4 +2226,28 @@
<value>0</value>
</property>
<property>
<description>
The default log aggregation policy class. Applications can
override it via LogAggregationContext. This configuration can provide
some cluster-side default behavior so that if the application doesn't
specify any policy via LogAggregationContext administrators of the cluster
can adjust the policy globally.
</description>
<name>yarn.nodemanager.log-aggregation.policy.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy</value>
</property>
<property>
<description>
The default parameters for the log aggregation policy. Applications can
override it via LogAggregationContext. This configuration can provide
some cluster-side default behavior so that if the application doesn't
specify any policy via LogAggregationContext administrators of the cluster
can adjust the policy globally.
</description>
<name>yarn.nodemanager.log-aggregation.policy.parameters</name>
<value></value>
</property>
</configuration>

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@ -242,8 +241,8 @@ public class ApplicationImpl implements Application {
app.logAggregationContext = initEvent.getLogAggregationContext();
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs, app.logAggregationContext));
app.credentials, app.applicationACLs,
app.logAggregationContext));
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@Private
public class AMOnlyLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
return logContext.getContainerType() == ContainerType.APPLICATION_MASTER;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@Private
public class AMOrFailedContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
int exitCode = logContext.getExitCode();
return logContext.getContainerType() == ContainerType.APPLICATION_MASTER ||
(exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode());
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
// The class provides no-op implementation for parseParameters. Polices
// that don't need parameters can derive from this class.
@Private
public abstract class AbstractContainerLogAggregationPolicy implements
ContainerLogAggregationPolicy {
public void parseParameters(String parameters) {
}
}

View File

@ -18,12 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
public interface AppLogAggregator extends Runnable {
void startContainerLogAggregation(ContainerId containerId,
boolean wasContainerSuccessful);
void startContainerLogAggregation(ContainerLogContext logContext);
void abortLogAggregation();

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,9 +57,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -107,7 +111,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final UserGroupInformation userUgi;
private final Path remoteNodeLogFileForApp;
private final Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
@ -128,12 +131,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>();
private final ContainerLogAggregationPolicy logAggPolicy;
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs) {
@ -146,7 +149,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dirsHandler = dirsHandler;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
this.lfs = lfs;
@ -204,6 +206,66 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
this.logAggPolicy = getLogAggPolicy(conf);
}
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
String params = getLogAggPolicyParameters(conf);
if (params != null) {
policy.parseParameters(params);
}
return policy;
}
// Use the policy class specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy class.
private ContainerLogAggregationPolicy getLogAggPolicyInstance(
Configuration conf) {
Class<? extends ContainerLogAggregationPolicy> policyClass = null;
if (this.logAggregationContext != null) {
String className =
this.logAggregationContext.getLogAggregationPolicyClassName();
if (className != null) {
try {
Class<?> policyFromContext = conf.getClassByName(className);
if (ContainerLogAggregationPolicy.class.isAssignableFrom(
policyFromContext)) {
policyClass = policyFromContext.asSubclass(
ContainerLogAggregationPolicy.class);
} else {
LOG.warn(this.appId + " specified invalid log aggregation policy " +
className);
}
} catch (ClassNotFoundException cnfe) {
// We don't fail the app if the policy class isn't valid.
LOG.warn(this.appId + " specified invalid log aggregation policy " +
className);
}
}
}
if (policyClass == null) {
policyClass = conf.getClass(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
AllContainerLogAggregationPolicy.class,
ContainerLogAggregationPolicy.class);
} else {
LOG.info(this.appId + " specifies ContainerLogAggregationPolicy of "
+ policyClass);
}
return ReflectionUtils.newInstance(policyClass, conf);
}
// Use the policy parameters specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy parameters.
private String getLogAggPolicyParameters(Configuration conf) {
String params = null;
if (this.logAggregationContext != null) {
params = this.logAggregationContext.getLogAggregationPolicyParameters();
}
if (params == null) {
params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
}
return params;
}
private void uploadLogsForContainers(boolean appFinished) {
@ -228,21 +290,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// Create a set of Containers whose logs will be uploaded in this cycle.
// It includes:
// a) all containers in pendingContainers: those containers are finished
// and satisfy the retentionPolicy.
// and satisfy the ContainerLogAggregationPolicy.
// b) some set of running containers: For all the Running containers,
// we have ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
// so simply set wasContainerSuccessful as true to
// bypass FAILED_CONTAINERS check and find the running containers
// which satisfy the retentionPolicy.
// we use exitCode of 0 to find those which satisfy the
// ContainerLogAggregationPolicy.
Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
this.pendingContainers.drainTo(pendingContainerInThisCycle);
Set<ContainerId> finishedContainers =
new HashSet<ContainerId>(pendingContainerInThisCycle);
if (this.context.getApplications().get(this.appId) != null) {
for (ContainerId container : this.context.getApplications()
.get(this.appId).getContainers().keySet()) {
if (shouldUploadLogs(container, true)) {
pendingContainerInThisCycle.add(container);
for (Container container : this.context.getApplications()
.get(this.appId).getContainers().values()) {
ContainerType containerType =
container.getContainerTokenIdentifier().getContainerType();
if (shouldUploadLogs(new ContainerLogContext(
container.getContainerId(), containerType, 0))) {
pendingContainerInThisCycle.add(container.getContainerId());
}
}
}
@ -506,46 +569,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// TODO: The condition: containerId.getId() == 1 to determine an AM container
// is not always true.
private boolean shouldUploadLogs(ContainerId containerId,
boolean wasContainerSuccessful) {
// All containers
if (this.retentionPolicy
.equals(ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
return true;
}
// AM Container only
if (this.retentionPolicy
.equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
if ((containerId.getContainerId()
& ContainerId.CONTAINER_ID_BITMASK)== 1) {
return true;
}
return false;
}
// AM + Failing containers
if (this.retentionPolicy
.equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
if ((containerId.getContainerId()
& ContainerId.CONTAINER_ID_BITMASK) == 1) {
return true;
} else if(!wasContainerSuccessful) {
return true;
}
return false;
}
return false;
private boolean shouldUploadLogs(ContainerLogContext logContext) {
return logAggPolicy.shouldDoLogAggregation(logContext);
}
@Override
public void startContainerLogAggregation(ContainerId containerId,
boolean wasContainerSuccessful) {
if (shouldUploadLogs(containerId, wasContainerSuccessful)) {
LOG.info("Considering container " + containerId
public void startContainerLogAggregation(ContainerLogContext logContext) {
if (shouldUploadLogs(logContext)) {
LOG.info("Considering container " + logContext.getContainerId()
+ " for log-aggregation");
this.pendingContainers.add(containerId);
this.pendingContainers.add(logContext.getContainerId());
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@Private
public class FailedContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
int exitCode = logContext.getExitCode();
return exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode();
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@Private
public class FailedOrKilledContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
return logContext.getExitCode() != 0;
}
}

View File

@ -48,8 +48,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -314,13 +315,12 @@ public class LogAggregationService extends AbstractService implements
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ApplicationEvent eventResponse;
try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
@ -342,8 +342,7 @@ public class LogAggregationService extends AbstractService implements
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
// Get user's FileSystem credentials
@ -357,7 +356,7 @@ public class LogAggregationService extends AbstractService implements
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
getRemoteNodeLogFileForApp(appId, user),
appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()));
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
@ -420,7 +419,10 @@ public class LogAggregationService extends AbstractService implements
+ ", did it fail to start?");
return;
}
aggregator.startContainerLogAggregation(containerId, exitCode == 0);
ContainerType containerType = context.getContainers().get(
containerId).getContainerTokenIdentifier().getContainerType();
aggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, containerType, exitCode));
}
private void stopApp(ApplicationId appId) {
@ -445,7 +447,6 @@ public class LogAggregationService extends AbstractService implements
(LogHandlerAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls(),
appStartEvent.getLogAggregationContext());
break;

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@Private
public class NoneContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
return false;
}
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
/**
* The sample policy samples logs of successful worker containers to aggregate.
* It always aggregates AM container and failed/killed worker
* containers' logs. To make sure small applications have enough logs, it only
* applies sampling beyond minimal number of containers. The parameters can be
* configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is
* 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful
* worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be
* aggregated.
*/
@Private
public class SampleContainerLogAggregationPolicy implements
ContainerLogAggregationPolicy {
private static final Log LOG =
LogFactory.getLog(SampleContainerLogAggregationPolicy.class);
static String SAMPLE_RATE = "SR";
public static final float DEFAULT_SAMPLE_RATE = 0.2f;
static String MIN_THRESHOLD = "MIN";
public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20;
private float sampleRate = DEFAULT_SAMPLE_RATE;
private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD;
static public String buildParameters(float sampleRate, int minThreshold) {
StringBuilder sb = new StringBuilder();
sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(",").
append(MIN_THRESHOLD).append(":").append(minThreshold);
return sb.toString();
}
// Parameters are comma separated properties, for example
// "SR:0.5,MIN:50"
public void parseParameters(String parameters) {
Collection<String> params = StringUtils.getStringCollection(parameters);
for(String param : params) {
// The first element is the property name.
// The second element is the property value.
String[] property = StringUtils.getStrings(param, ":");
if (property == null || property.length != 2) {
continue;
}
if (property[0].equals(SAMPLE_RATE)) {
try {
float sampleRate = Float.parseFloat(property[1]);
if (sampleRate >= 0.0 && sampleRate <= 1.0) {
this.sampleRate = sampleRate;
} else {
LOG.warn("The format isn't valid. Sample rate falls back to the " +
"default value " + DEFAULT_SAMPLE_RATE);
}
} catch (NumberFormatException nfe) {
LOG.warn("The format isn't valid. Sample rate falls back to the " +
"default value " + DEFAULT_SAMPLE_RATE);
}
} else if (property[0].equals(MIN_THRESHOLD)) {
try {
int minThreshold = Integer.parseInt(property[1]);
if (minThreshold >= 0) {
this.minThreshold = minThreshold;
} else {
LOG.warn("The format isn't valid. Min threshold falls back to " +
"the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
}
} catch (NumberFormatException nfe) {
LOG.warn("The format isn't valid. Min threshold falls back to the " +
"default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
}
}
}
}
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
if (logContext.getContainerType() ==
ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) {
// If it is AM or failed or killed container, enable log aggregation.
return true;
}
// Only sample log aggregation for large applications.
// We assume the container id is continuously allocated from number 1 and
// Worker containers start from id 2. So logs of worker containers with ids
// in [2, minThreshold + 1] will be aggregated.
if ((logContext.getContainerId().getContainerId() &
ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) {
return true;
}
// Sample log aggregation for the rest of successful worker containers
return (sampleRate != 0 &&
logContext.getContainerId().hashCode() % (1/sampleRate) == 0);
}
}

View File

@ -24,32 +24,27 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final ApplicationId applicationId;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final String user;
private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
this(appId, user, credentials, retentionPolicy, appAcls, null);
Credentials credentials, Map<ApplicationAccessType, String> appAcls) {
this(appId, user, credentials, appAcls, null);
}
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
this.retentionPolicy = retentionPolicy;
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
}
@ -62,10 +57,6 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
return this.credentials;
}
public ContainerLogsRetentionPolicy getLogRetentionPolicy() {
return this.retentionPolicy;
}
public String getUser() {
return this.user;
}

View File

@ -87,6 +87,7 @@ public class TestAuxServices {
this.stoppedApps = new ArrayList<Integer>();
}
@SuppressWarnings("unchecked")
public ArrayList<Integer> getAppIdsStopped() {
return (ArrayList<Integer>)this.stoppedApps.clone();
}

View File

@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -99,11 +101,13 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@ -191,12 +195,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
app1LogDir.mkdir();
logAggregationService
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
application1, this.user, null, this.acls));
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerId container11 = createContainer(appAttemptId, 1,
ContainerType.APPLICATION_MASTER);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
"stderr", "syslog" });
@ -302,11 +306,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
LogAggregationContext context =
LogAggregationContext.newInstance("HOST*", "sys*");
logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user,
null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, context));
null, this.acls, context));
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(app, 1);
ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerId cont = createContainer(appAttemptId, 1,
ContainerType.APPLICATION_MASTER);
writeContainerLogs(appLogDir, cont, new String[] { "stdout",
"stderr", "syslog" });
logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0));
@ -337,8 +342,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
app1LogDir.mkdir();
logAggregationService
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
application1, this.user, null, this.acls));
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
@ -388,13 +392,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
app1LogDir.mkdir();
logAggregationService
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
application1, this.user, null, this.acls));
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
ContainerId container11 = createContainer(appAttemptId1, 1,
ContainerType.APPLICATION_MASTER);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11, fileNames);
logAggregationService.handle(
@ -407,18 +411,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(
application2, this.user, null,
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
LogAggregationContext contextWithAMOnly =
Records.newRecord(LogAggregationContext.class);
contextWithAMOnly.setLogAggregationPolicyClassName(
AMOnlyLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(
application2, this.user, null, this.acls, contextWithAMOnly));
ContainerId container21 = createContainer(appAttemptId2, 1,
ContainerType.APPLICATION_MASTER);
ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container21, fileNames);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container21, 0));
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
ContainerId container12 = createContainer(appAttemptId1, 2,
ContainerType.TASK);
writeContainerLogs(app1LogDir, container12, fileNames);
logAggregationService.handle(
@ -431,9 +440,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
@ -450,22 +463,26 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
reset(appEventHandler);
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
ContainerId container31 = createContainer(appAttemptId3, 1,
ContainerType.APPLICATION_MASTER);
writeContainerLogs(app3LogDir, container31, fileNames);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container31, 0));
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
ContainerId container32 = createContainer(appAttemptId3, 2,
ContainerType.TASK);
writeContainerLogs(app3LogDir, container32, fileNames);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
ContainerId container22 = createContainer(appAttemptId2, 2,
ContainerType.TASK);
writeContainerLogs(app2LogDir, container22, fileNames);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container22, 0));
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
ContainerId container33 = createContainer(appAttemptId3, 3,
ContainerType.TASK);
writeContainerLogs(app3LogDir, container33, fileNames);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container33, 0));
@ -528,10 +545,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
this.acls));
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
// Verify that it failed
@ -551,11 +571,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File appLogDir =
new File(localLogDir, ConverterUtils.toString(appId2));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
this.acls));
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
// Verify that it worked
@ -627,8 +644,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
remoteRootLogDir.getAbsolutePath(), this.user));
Path suffixDir = new Path(userDir, logSuffix);
Path appDir = new Path(suffixDir, appId.toString());
LogAggregationContext contextWithAllContainers =
Records.newRecord(LogAggregationContext.class);
contextWithAllContainers.setLogAggregationPolicyClassName(
AllContainerLogAggregationPolicy.class.getName());
aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
@ -637,7 +658,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
Path appDir2 = new Path(suffixDir, appId2.toString());
aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
// start another application with the app dir already created and verify
@ -646,7 +667,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Path appDir3 = new Path(suffixDir, appId3.toString());
new File(appDir3.toUri().getPath()).mkdir();
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
this.acls, contextWithAllContainers));
verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
aggSvc.stop();
aggSvc.close();
@ -674,13 +695,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
doThrow(new YarnRuntimeException("KABOOM!"))
.when(logAggregationService).initAppAggregator(
eq(appId), eq(user), any(Credentials.class),
any(ContainerLogsRetentionPolicy.class), anyMap(),
any(LogAggregationContext.class));
anyMap(), any(LogAggregationContext.class));
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
this.acls));
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
@ -724,10 +745,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
doThrow(e)
.when(logAggregationService).createAppDir(any(String.class),
any(ApplicationId.class), any(UserGroupInformation.class));
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
new ApplicationEvent(appId,
@ -765,10 +789,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
}
private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
private LogFileStatusInLastCycle verifyContainerLogs(
LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
String[] logFiles, int numOfLogsPerContainer,
boolean multiLogs) throws IOException {
return verifyContainerLogs(logAggregationService, appId,
expectedContainerIds, expectedContainerIds.length,
expectedContainerIds.length, logFiles, numOfLogsPerContainer,
multiLogs);
}
// expectedContainerIds is the minimal set of containers to check.
// The actual list of containers could be more than that.
// Verify the size of the actual list is in the range of
// [minNumOfContainers, maxNumOfContainers].
private LogFileStatusInLastCycle verifyContainerLogs(
LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
int minNumOfContainers, int maxNumOfContainers,
String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
@ -780,6 +821,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
} catch (FileNotFoundException fnf) {
Assert.fail("Should have log files");
}
if (numOfLogsPerContainer == 0) {
Assert.assertTrue(!nodeFiles.hasNext());
return null;
}
Assert.assertTrue(nodeFiles.hasNext());
FileStatus targetNodeFile = null;
@ -865,11 +910,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
// 1 for each container
Assert.assertEquals(expectedContainerIds.length, logMap.size());
Assert.assertTrue("number of containers with logs should be at least " +
minNumOfContainers,logMap.size() >= minNumOfContainers);
Assert.assertTrue("number of containers with logs should be at most " +
minNumOfContainers,logMap.size() <= maxNumOfContainers);
for (ContainerId cId : expectedContainerIds) {
String containerStr = ConverterUtils.toString(cId);
Map<String, String> thisContainerMap = logMap.remove(containerStr);
Assert.assertEquals(numOfContainerLogs, thisContainerMap.size());
Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size());
for (String fileType : logFiles) {
String expectedValue =
containerStr + " Hello " + fileType + "!End of LogType:"
@ -882,8 +930,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertEquals(0, logMap.size());
return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
Assert.assertTrue("number of remaining containers should be at least " +
(minNumOfContainers - expectedContainerIds.length),
logMap.size() >= minNumOfContainers - expectedContainerIds.length);
Assert.assertTrue("number of remaining containers should be at most " +
(maxNumOfContainers - expectedContainerIds.length),
logMap.size() <= maxNumOfContainers - expectedContainerIds.length);
return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(),
fileTypes);
} finally {
reader.close();
}
@ -991,9 +1046,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
LogAggregationContext contextWithAllContainers =
Records.newRecord(LogAggregationContext.class);
contextWithAllContainers.setLogAggregationPolicyClassName(
AllContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
application1, this.user, null, this.acls, contextWithAllContainers));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
@ -1015,8 +1073,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
application1, this.user, null, this.acls));
logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
dispatcher.await();
@ -1216,12 +1273,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, ConverterUtils.toString(application1));
appLogDir1.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
this.user, null, this.acls,
logAggregationContextWithIncludePatterns));
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1);
ContainerId container1 = createContainer(appAttemptId1, 1,
ContainerType.APPLICATION_MASTER);
// Simulate log-file creation
writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
@ -1239,10 +1297,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
LogAggregationContextWithExcludePatterns.setLogAggregationPolicyClassName(
AMOnlyLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(application2,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, LogAggregationContextWithExcludePatterns));
ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1);
this.user, null, this.acls, LogAggregationContextWithExcludePatterns));
ContainerId container2 = createContainer(appAttemptId2, 1,
ContainerType.APPLICATION_MASTER);
writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
"stderr", "syslog" });
@ -1262,10 +1322,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
context1.setLogAggregationPolicyClassName(
AMOnlyLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, context1));
ContainerId container3 = BuilderUtils.newContainerId(appAttemptId3, 1);
this.user, null, this.acls, context1));
ContainerId container3 = createContainer(appAttemptId3, 1,
ContainerType.APPLICATION_MASTER);
writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
logAggregationService.handle(
@ -1285,10 +1347,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app4LogDir =
new File(localLogDir, ConverterUtils.toString(application4));
app4LogDir.mkdir();
context2.setLogAggregationPolicyClassName(
AMOnlyLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(application4,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, context2));
ContainerId container4 = BuilderUtils.newContainerId(appAttemptId4, 1);
this.user, null, this.acls, context2));
ContainerId container4 = createContainer(appAttemptId4, 1,
ContainerType.APPLICATION_MASTER);
writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
logAggregationService.handle(
@ -1346,6 +1410,471 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
"getApplicationID");
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testNoneContainerPolicy() throws Exception {
ApplicationId appId = createApplication();
// LogContext specifies policy to not aggregate any container logs
LogAggregationService logAggregationService = createLogAggregationService(
appId, NoneContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
ContainerId container1 = finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 0, false);
verifyLogAggFinishEvent(appId);
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testFailedContainerPolicy() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, FailedContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
ContainerId container1 = finishContainer(
appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 1,
logFiles);
finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 0,
logFiles);
finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testAMOrFailedContainerPolicy() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, AMOrFailedContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
ContainerId container1 = finishContainer(
appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
logFiles);
ContainerId container2= finishContainer(appId,
logAggregationService, ContainerType.TASK, 2, 1, logFiles);
finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1, container2 }, logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testFailedOrKilledContainerPolicy() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, FailedOrKilledContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
finishContainer(appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
logFiles);
ContainerId container2 = finishContainer(appId,
logAggregationService, ContainerType.TASK, 2, 1, logFiles);
ContainerId container3 = finishContainer(appId, logAggregationService,
ContainerType.TASK, 3,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container2, container3 }, logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testAMOnlyContainerPolicy() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, AMOnlyLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
ContainerId container1 = finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 1,
logFiles);
finishContainer(appId, logAggregationService, ContainerType.TASK, 3, 0,
logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
// Test sample container policy with an app that has
// the same number of successful containers as
// SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
// and verify all those containers' logs are aggregated.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWithSmallApp() throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
false);
}
// Test sample container policy with an app that has
// more successful containers than
// SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
// and verify some of those containers' logs are aggregated.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWithLargeApp() throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
false);
}
// Test sample container policy with zero sample rate.
// and verify there is no sampling beyond the MIN_THRESHOLD containers.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWithZeroSampleRate() throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
0, SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
false);
}
// Test sample container policy with 100 percent sample rate.
// and verify all containers' logs are aggregated.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWith100PercentSampleRate()
throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
1.0f,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
false);
}
// Test sample container policy with zero min threshold.
// and verify some containers' logs are aggregated.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWithZeroMinThreshold()
throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, 0, false);
}
// Test sample container policy with customized settings.
// and verify some containers' logs are aggregated.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testSampleContainerPolicyWithCustomizedSettings()
throws Exception {
setupAndTestSampleContainerPolicy(500, 0.5f, 50, false);
}
// Test cluster-wide sample container policy.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testClusterSampleContainerPolicy()
throws Exception {
setupAndTestSampleContainerPolicy(500, 0.5f, 50, true);
}
// Test the default cluster-wide sample container policy.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testDefaultClusterSampleContainerPolicy() throws Exception {
setupAndTestSampleContainerPolicy(
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
true);
}
// The application specifies invalid policy class
// NM should fallback to the default policy which is to aggregate all
// containers.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testInvalidPolicyClassName() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, "foo", null, true);
verifyDefaultPolicy(appId, logAggregationService);
}
// The application specifies LogAggregationContext, but not policy class.
// NM should fallback to the default policy which is to aggregate all
// containers.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testNullPolicyClassName() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, null, null, true);
verifyDefaultPolicy(appId, logAggregationService);
}
// The application doesn't specifies LogAggregationContext.
// NM should fallback to the default policy which is to aggregate all
// containers.
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testDefaultPolicyWithoutLogAggregationContext()
throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, null, null, false);
verifyDefaultPolicy(appId, logAggregationService);
}
private void verifyDefaultPolicy(ApplicationId appId,
LogAggregationService logAggregationService) throws Exception {
String[] logFiles = new String[] { "stdout" };
ContainerId container1 = finishContainer(
appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
logFiles);
ContainerId container2 = finishContainer(appId,
logAggregationService, ContainerType.TASK, 2, 1, logFiles);
ContainerId container3 = finishContainer(appId, logAggregationService,
ContainerType.TASK, 3,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1, container2, container3 },
logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
// If enableAtClusterLevel is false, the policy is set up via
// LogAggregationContext at the application level. If it is true,
// the policy is set up via Configuration at the cluster level.
private void setupAndTestSampleContainerPolicy(int successfulContainers,
float sampleRate, int minThreshold, boolean enableAtClusterLevel)
throws Exception {
ApplicationId appId = createApplication();
String policyParameters = null;
if (sampleRate != SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE
|| minThreshold !=
SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD) {
policyParameters = SampleContainerLogAggregationPolicy.buildParameters(
sampleRate, minThreshold);
}
if (enableAtClusterLevel) {
this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
SampleContainerLogAggregationPolicy.class.getName());
if (policyParameters != null) {
this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS,
policyParameters);
}
}
LogAggregationService logAggregationService = createLogAggregationService(
appId, SampleContainerLogAggregationPolicy.class.getName(),
policyParameters, !enableAtClusterLevel);
ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
String[] logFiles = new String[] { "stdout" };
int cid = 1;
// AM container
containerIds.add(finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, cid++, 0, logFiles));
// Successful containers
// We expect the minThreshold containers will be log aggregated.
if (minThreshold > 0) {
containerIds.addAll(finishContainers(appId, logAggregationService, cid,
(successfulContainers > minThreshold) ? minThreshold :
successfulContainers, 0, logFiles));
}
cid = containerIds.size() + 1;
if (successfulContainers > minThreshold) {
List<ContainerId> restOfSuccessfulContainers = finishContainers(appId,
logAggregationService, cid, successfulContainers - minThreshold, 0,
logFiles);
cid += successfulContainers - minThreshold;
// If the sample rate is 100 percent, restOfSuccessfulContainers will be
// all be log aggregated.
if (sampleRate == 1.0) {
containerIds.addAll(restOfSuccessfulContainers);
}
}
// Failed container
containerIds.add(finishContainer(appId, logAggregationService,
ContainerType.TASK, cid++, 1, logFiles));
// Killed container
containerIds.add(finishContainer(appId, logAggregationService,
ContainerType.TASK, cid++,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles));
finishApplication(appId, logAggregationService);
// The number of containers with logs should be 3(AM + failed + killed) +
// DEFAULT_SAMPLE_MIN_THRESHOLD +
// ( successfulContainers - DEFAULT_SAMPLE_MIN_THRESHOLD ) * SAMPLE_RATE
// Due to the sampling nature, the exact number could vary.
// So we only check for a range.
// For the cases where successfulContainers is the same as minThreshold
// or sampleRate is zero, minOfContainersWithLogs and
// maxOfContainersWithLogs will the same.
int minOfContainersWithLogs = 3 + minThreshold +
(int)((successfulContainers - minThreshold) * sampleRate / 2);
int maxOfContainersWithLogs = 3 + minThreshold +
(int)((successfulContainers - minThreshold) * sampleRate * 2);
verifyContainerLogs(logAggregationService, appId,
containerIds.toArray(new ContainerId[containerIds.size()]),
minOfContainersWithLogs, maxOfContainersWithLogs,
logFiles, 1, false);
verifyLogAggFinishEvent(appId);
}
private ApplicationId createApplication() {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
Application mockApp = mock(Application.class);
when(mockApp.getContainers()).thenReturn(
new HashMap<ContainerId, Container>());
this.context.getApplications().put(appId, mockApp);
return appId;
}
private LogAggregationService createLogAggregationService(
ApplicationId appId,
Class<? extends ContainerLogAggregationPolicy> policy,
String parameters) {
return createLogAggregationService(appId, policy.getName(), parameters,
true);
}
private LogAggregationService createLogAggregationService(
ApplicationId appId, String className, String parameters,
boolean createLogAggContext) {
ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
LogAggregationContext logAggContext = null;
if (createLogAggContext) {
logAggContext = Records.newRecord(LogAggregationContext.class);
logAggContext.setLogAggregationPolicyClassName(className);
if (parameters != null) {
logAggContext.setLogAggregationPolicyParameters(parameters);
}
}
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.acls, logAggContext));
return logAggregationService;
}
private ContainerId createContainer(ApplicationAttemptId appAttemptId1,
long cId, ContainerType containerType) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1,
cId);
Resource r = BuilderUtils.newResource(1024, 1);
ContainerTokenIdentifier containerToken = new ContainerTokenIdentifier(
containerId, context.getNodeId().toString(), user, r,
System.currentTimeMillis() + 100000L, 123, DUMMY_RM_IDENTIFIER,
Priority.newInstance(0), 0, null, null, containerType);
Container container = mock(Container.class);
context.getContainers().put(containerId, container);
when(container.getContainerTokenIdentifier()).thenReturn(containerToken);
when(container.getContainerId()).thenReturn(containerId);
return containerId;
}
private ContainerId finishContainer(ApplicationId application1,
LogAggregationService logAggregationService, ContainerType containerType,
long cId, int exitCode, String[] logFiles) throws IOException {
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId containerId = createContainer(appAttemptId1, cId,
containerType);
// Simulate log-file creation
File appLogDir1 =
new File(localLogDir, ConverterUtils.toString(application1));
appLogDir1.mkdir();
writeContainerLogs(appLogDir1, containerId, logFiles);
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
containerId, exitCode));
return containerId;
}
private List<ContainerId> finishContainers(ApplicationId appId,
LogAggregationService logAggregationService, long startingCid, int count,
int exitCode, String[] logFiles) throws IOException {
ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
for (long cid = startingCid; cid < startingCid + count; cid++) {
containerIds.add(finishContainer(
appId, logAggregationService, ContainerType.TASK, cid, exitCode,
logFiles));
}
return containerIds;
}
private void finishApplication(ApplicationId appId,
LogAggregationService logAggregationService) throws Exception {
dispatcher.await();
ApplicationEvent expectedInitEvents[] =
new ApplicationEvent[] { new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) };
checkEvents(appEventHandler, expectedInitEvents, false, "getType",
"getApplicationID");
reset(appEventHandler);
logAggregationService.handle(new LogHandlerAppFinishedEvent(appId));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception {
dispatcher.await();
ApplicationEvent[] expectedFinishedEvents =
new ApplicationEvent[] { new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType",
"getApplicationID");
}
@Test (timeout = 50000)
public void testLogAggregationServiceWithInterval() throws Exception {
testLogAggregationService(false);
@ -1391,17 +1920,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationId application = BuilderUtils.newApplicationId(123456, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application, 1);
ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerId container = createContainer(appAttemptId, 1,
ContainerType.APPLICATION_MASTER);
Context context = spy(this.context);
ConcurrentMap<ApplicationId, Application> maps =
new ConcurrentHashMap<ApplicationId, Application>();
this.context.getApplications();
Application app = mock(Application.class);
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
containers.put(container, mock(Container.class));
maps.put(application, app);
when(app.getContainers()).thenReturn(containers);
when(context.getApplications()).thenReturn(maps);
when(app.getContainers()).thenReturn(this.context.getContainers());
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, context, this.delSrvc,
@ -1415,8 +1941,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, ConverterUtils.toString(application));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
logAggregationContextWithInterval));
this.user, null, this.acls, logAggregationContextWithInterval));
LogFileStatusInLastCycle logFileStatusInLastCycle = null;
// Simulate log-file creation
@ -1536,7 +2061,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.init(this.conf);
logAggregationService.start();
logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
this.user, null, this.acls,
Records.newRecord(LogAggregationContext.class)));
// Inject new token for log-aggregation after app log-aggregator init

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@ -148,8 +147,7 @@ public class TestNonAggregatingLogHandler {
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
@ -189,8 +187,7 @@ public class TestNonAggregatingLogHandler {
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
@ -357,8 +354,7 @@ public class TestNonAggregatingLogHandler {
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
@ -445,7 +441,7 @@ public class TestNonAggregatingLogHandler {
doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
appAcls));
// test case where some dirs have the log dir to delete
// mock some dirs throwing various exceptions

View File

@ -231,16 +231,22 @@ public class TestContainerAllocation {
LogAggregationContext.newInstance(
"includePattern", "excludePattern",
"rolledLogsIncludePattern",
"rolledLogsExcludePattern");
"rolledLogsExcludePattern",
"policyClass",
"policyParameters");
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals("rolledLogsIncludePattern",
returned.getRolledLogsIncludePattern());
returned.getRolledLogsIncludePattern());
Assert.assertEquals("rolledLogsExcludePattern",
returned.getRolledLogsExcludePattern());
returned.getRolledLogsExcludePattern());
Assert.assertEquals("policyClass",
returned.getLogAggregationPolicyClassName());
Assert.assertEquals("policyParameters",
returned.getLogAggregationPolicyParameters());
rm1.stop();
}