NIFI-3065 Per Process Group logging (#7315)

* NIFI-3065 Per Process Group logging
This commit is contained in:
timeabarna 2023-06-24 03:14:34 +02:00 committed by GitHub
parent 7d6af0dbb0
commit ba6797bb94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1002 additions and 116 deletions

View File

@ -47,6 +47,8 @@ public class VersionedProcessGroup extends VersionedComponent {
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String logFileSuffix;
@ApiModelProperty("The child Process Groups")
public Set<VersionedProcessGroup> getProcessGroups() {
@ -205,4 +207,13 @@ public class VersionedProcessGroup extends VersionedComponent {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}
public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}

View File

@ -138,6 +138,12 @@ language governing permissions and limitations under the License. -->
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-per-process-group-logging</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>

View File

@ -30,6 +30,7 @@
<include>*:logback-core</include>
<include>*:nifi-api</include>
<include>*:nifi-property-protection-api</include>
<include>*:nifi-per-process-group-logging</include>
</includes>
</dependencySet>

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-per-process-group-logging</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.Discriminator;
import org.slf4j.event.KeyValuePair;
public class NifiDiscriminator implements Discriminator<ILoggingEvent> {
private static final String KEY = "logFileSuffix";
private boolean started;
@Override
public String getDiscriminatingValue(final ILoggingEvent iLoggingEvent) {
for (KeyValuePair keyValuePair : iLoggingEvent.getKeyValuePairs()) {
if (keyValuePair.key.equals(getKey())) {
return keyValuePair.value.toString();
}
}
return null;
}
@Override
public String getKey() {
return KEY;
}
@Override
public void start() {
started = true;
}
@Override
public void stop() {
started = false;
}
@Override
public boolean isStarted() {
return started;
}
}

View File

@ -36,6 +36,7 @@
<module>nifi-logging-utils</module>
<module>nifi-metrics</module>
<module>nifi-parameter</module>
<module>nifi-per-process-group-logging</module>
<module>nifi-property-encryptor</module>
<module>nifi-property-utils</module>
<module>nifi-properties</module>

View File

@ -39,6 +39,7 @@ public class ProcessGroupDTO extends ComponentDTO {
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String logFileSuffix;
private Integer runningCount;
private Integer stoppedCount;
@ -403,4 +404,13 @@ public class ProcessGroupDTO extends ComponentDTO {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}
public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}

View File

@ -61,6 +61,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
@ -1572,7 +1573,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredState, ScheduledState scheduledState) {
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
LOG.info("Starting {}", this);
ScheduledState currentState;
@ -1714,7 +1715,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
AtomicLong startupAttemptCount, final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) {
final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
// Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run.
final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE);

View File

@ -58,6 +58,7 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
@ -596,7 +597,8 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
enablingAttemptCount.incrementAndGet();
if (enablingAttemptCount.get() == 120 || enablingAttemptCount.get() % 3600 == 0) {
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Encountering difficulty enabling. (Validation State is {}: {}). Will continue trying to enable.",
validationState, validationState.getValidationErrors());
}
@ -635,7 +637,8 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
future.completeExceptionally(e);
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnEnabled method", cause);
invokeDisable(configContext);
@ -717,7 +720,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
LOG.debug("Successfully disabled {}", this);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this, new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}

View File

@ -25,6 +25,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import java.io.IOException;
import java.util.Map;
@ -52,7 +53,7 @@ public class StandardStateManager implements StateManager {
final LogRepository repo = LogRepositoryFactory.getRepository(componentId);
final ComponentLog logger = (repo == null) ? null : repo.getLogger();
if (repo == null || logger == null) {
return new SimpleProcessLogger(componentId, this);
return new SimpleProcessLogger(componentId, this, new StandardLoggingContext(null));
}
return logger;

View File

@ -51,6 +51,7 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.security.util.SslContextFactory;
@ -341,7 +342,7 @@ public class StandardStateManagerProvider implements StateManagerProvider {
propertyMap.put(descriptor, new StandardPropertyValue(resourceContext, entry.getValue(),null, parameterLookup, variableRegistry));
}
final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider);
final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider, new StandardLoggingContext(null));
final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerConfig.getId(), propertyMap, sslContext, logger);
synchronized (provider) {

View File

@ -317,6 +317,10 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
if (group.getLogFileSuffix() == null || group.getLogFileSuffix().isEmpty()) {
group.setLogFileSuffix(proposed.getLogFileSuffix());
}
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates == null) {
group.disconnectVersionControl(false);
@ -1795,6 +1799,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
groupToUpdate.setComments(proposed.getComments());
groupToUpdate.setName(proposed.getName());
groupToUpdate.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
groupToUpdate.setLogFileSuffix(proposed.getLogFileSuffix());
if (processGroup == null) {
LOG.info("Successfully synchronized {} by adding it to the flow", groupToUpdate);

View File

@ -214,6 +214,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
private static final long DEFAULT_BACKPRESSURE_OBJECT = 10_000L;
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "1 GB";
private static final Pattern INVALID_DIRECTORY_NAME_CHARACTERS = Pattern.compile("[\\s\\<\\>:\\'\\\"\\/\\\\\\|\\?\\*]");
private volatile String logFileSuffix;
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
@ -244,6 +246,7 @@ public final class StandardProcessGroup implements ProcessGroup {
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
this.logFileSuffix = null;
// save only the nifi properties needed, and account for the possibility those properties are missing
if (nifiProperties == null) {
@ -4422,6 +4425,20 @@ public final class StandardProcessGroup implements ProcessGroup {
return new QueueSize(count, contentSize);
}
@Override
public String getLogFileSuffix() {
return logFileSuffix;
}
@Override
public void setLogFileSuffix(final String logFileSuffix) {
if (logFileSuffix != null && INVALID_DIRECTORY_NAME_CHARACTERS.matcher(logFileSuffix).find()) {
throw new IllegalArgumentException("Log file suffix can not contain the following characters: space, <, >, :, \', \", /, \\, |, ?, *");
} else {
this.logFileSuffix = logFileSuffix;
}
}
@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;
import java.util.Optional;
public interface LoggingContext {
/**
* @return the log file name suffix. This will be the discriminating value for the dedicated logging.
*/
Optional<String> getLogFileSuffix();
/**
* @return The key under which the discriminating value should be exported into the host environment.
*/
String getDiscriminatorKey();
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;
import org.apache.nifi.groups.ProcessGroup;
import java.util.Optional;
public class StandardLoggingContext implements LoggingContext {
private static final String KEY = "logFileSuffix";
private volatile GroupedComponent component;
public StandardLoggingContext(final GroupedComponent component) {
this.component = component;
}
@Override
public Optional<String> getLogFileSuffix() {
if (component != null) {
return getSuffix(component.getProcessGroup());
} else {
return Optional.empty();
}
}
@Override
public String getDiscriminatorKey() {
return KEY;
}
private Optional<String> getSuffix(final ProcessGroup group) {
if (group == null) {
return Optional.empty();
} else if (group.getLogFileSuffix() != null && !group.getLogFileSuffix().isEmpty()) {
return Optional.of(group.getLogFileSuffix());
} else if (group.isRootGroup()) {
return Optional.empty();
} else {
return getSuffix(group.getParent());
}
}
public void setComponent(final GroupedComponent component) {
this.component = component;
}
}

View File

@ -22,8 +22,10 @@ import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogMessage;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.util.ArrayList;
import java.util.List;
@ -38,14 +40,17 @@ public class SimpleProcessLogger implements ComponentLog {
private final LogRepository logRepository;
private final Object component;
public SimpleProcessLogger(final String componentId, final Object component) {
this(component, LogRepositoryFactory.getRepository(componentId));
private final LoggingContext loggingContext;
public SimpleProcessLogger(final String componentId, final Object component, final LoggingContext loggingContext) {
this(component, LogRepositoryFactory.getRepository(componentId), loggingContext);
}
public SimpleProcessLogger(final Object component, final LogRepository logRepository) {
public SimpleProcessLogger(final Object component, final LogRepository logRepository, final LoggingContext loggingContext) {
this.logger = LoggerFactory.getLogger(component.getClass());
this.logRepository = logRepository;
this.component = component;
this.loggingContext = loggingContext;
}
@Override
@ -55,10 +60,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] repositoryArguments = getRepositoryArguments(t);
if (t == null) {
logger.warn(componentMessage, component);
log(Level.WARN, componentMessage, component);
logRepository.addLogMessage(LogLevel.WARN, componentMessage, repositoryArguments);
} else {
logger.warn(componentMessage, component, t);
log(Level.WARN, componentMessage, component, t);
logRepository.addLogMessage(LogLevel.WARN, getCausesMessage(msg), repositoryArguments, t);
}
}
@ -72,10 +77,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Throwable lastThrowable = findLastThrowable(os);
if (lastThrowable == null) {
logger.warn(componentMessage, arguments);
log(Level.WARN, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.WARN, componentMessage, arguments);
} else {
logger.warn(componentMessage, setFormattedThrowable(arguments, lastThrowable));
log(Level.WARN, componentMessage, setFormattedThrowable(arguments, lastThrowable));
logRepository.addLogMessage(LogLevel.WARN, getCausesMessage(msg), setCauses(arguments, lastThrowable), lastThrowable);
}
}
@ -88,10 +93,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] arguments = insertComponent(os);
if (t == null) {
logger.warn(componentMessage, arguments);
log(Level.WARN, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.WARN, componentMessage, arguments);
} else {
logger.warn(componentMessage, addThrowable(arguments, t));
log(Level.WARN, componentMessage, addThrowable(arguments, t));
logRepository.addLogMessage(LogLevel.WARN, getCausesMessage(msg), addCauses(arguments, t), t);
}
}
@ -117,10 +122,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] repositoryArguments = getRepositoryArguments(t);
if (t == null) {
logger.trace(componentMessage, component);
log(Level.TRACE, componentMessage, component);
logRepository.addLogMessage(LogLevel.TRACE, componentMessage, repositoryArguments);
} else {
logger.trace(componentMessage, component, t);
log(Level.TRACE, componentMessage, component, t);
logRepository.addLogMessage(LogLevel.TRACE, getCausesMessage(msg), repositoryArguments, t);
}
}
@ -134,10 +139,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Throwable lastThrowable = findLastThrowable(os);
if (lastThrowable == null) {
logger.trace(componentMessage, arguments);
log(Level.TRACE, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.TRACE, componentMessage, arguments);
} else {
logger.trace(componentMessage, setFormattedThrowable(arguments, lastThrowable));
log(Level.TRACE, componentMessage, setFormattedThrowable(arguments, lastThrowable));
logRepository.addLogMessage(LogLevel.TRACE, getCausesMessage(msg), setCauses(arguments, lastThrowable), lastThrowable);
}
}
@ -155,10 +160,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] arguments = insertComponent(os);
if (t == null) {
logger.trace(componentMessage, arguments);
log(Level.TRACE, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.TRACE, componentMessage, arguments);
} else {
logger.trace(componentMessage, addThrowable(arguments, t));
log(Level.TRACE, componentMessage, addThrowable(arguments, t));
logRepository.addLogMessage(LogLevel.TRACE, getCausesMessage(msg), addCauses(arguments, t), t);
}
}
@ -204,10 +209,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] repositoryArguments = getRepositoryArguments(t);
if (t == null) {
logger.info(componentMessage, component);
log(Level.INFO, componentMessage, component);
logRepository.addLogMessage(LogLevel.INFO, componentMessage, repositoryArguments);
} else {
logger.info(componentMessage, component, t);
log(Level.INFO, componentMessage, component, t);
logRepository.addLogMessage(LogLevel.INFO, getCausesMessage(msg), repositoryArguments, t);
}
}
@ -221,10 +226,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Throwable lastThrowable = findLastThrowable(os);
if (lastThrowable == null) {
logger.info(componentMessage, arguments);
log(Level.INFO, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.INFO, componentMessage, arguments);
} else {
logger.info(componentMessage, setFormattedThrowable(arguments, lastThrowable));
log(Level.INFO, componentMessage, setFormattedThrowable(arguments, lastThrowable));
logRepository.addLogMessage(LogLevel.INFO, getCausesMessage(msg), setCauses(arguments, lastThrowable), lastThrowable);
}
}
@ -242,10 +247,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] arguments = insertComponent(os);
if (t == null) {
logger.info(componentMessage, arguments);
log(Level.INFO, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.INFO, componentMessage, arguments);
} else {
logger.info(componentMessage, addThrowable(arguments, t));
log(Level.INFO, componentMessage, addThrowable(arguments, t));
logRepository.addLogMessage(LogLevel.INFO, getCausesMessage(msg), addCauses(arguments, t), t);
}
}
@ -276,10 +281,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] repositoryArguments = getRepositoryArguments(t);
if (t == null) {
logger.error(componentMessage, component);
log(Level.ERROR, componentMessage, component);
logRepository.addLogMessage(LogLevel.ERROR, componentMessage, repositoryArguments);
} else {
logger.error(componentMessage, component, t);
log(Level.ERROR, componentMessage, component, t);
logRepository.addLogMessage(LogLevel.ERROR, getCausesMessage(msg), repositoryArguments, t);
}
}
@ -293,10 +298,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Throwable lastThrowable = findLastThrowable(os);
if (lastThrowable == null) {
logger.error(componentMessage, arguments);
log(Level.ERROR, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.ERROR, componentMessage, arguments);
} else {
logger.error(componentMessage, setFormattedThrowable(arguments, lastThrowable));
log(Level.ERROR, componentMessage, setFormattedThrowable(arguments, lastThrowable));
logRepository.addLogMessage(LogLevel.ERROR, getCausesMessage(msg), setCauses(arguments, lastThrowable), lastThrowable);
}
}
@ -309,10 +314,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] arguments = insertComponent(os);
if (t == null) {
logger.error(componentMessage, arguments);
log(Level.ERROR, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.ERROR, componentMessage, arguments);
} else {
logger.error(componentMessage, addThrowable(arguments, t));
log(Level.ERROR, componentMessage, addThrowable(arguments, t));
logRepository.addLogMessage(LogLevel.ERROR, getCausesMessage(msg), addCauses(arguments, t), t);
}
}
@ -333,10 +338,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] repositoryArguments = getRepositoryArguments(t);
if (t == null) {
logger.debug(componentMessage, component);
log(Level.DEBUG, componentMessage, component);
logRepository.addLogMessage(LogLevel.DEBUG, componentMessage, repositoryArguments);
} else {
logger.debug(componentMessage, component, t);
log(Level.DEBUG, componentMessage, component, t);
logRepository.addLogMessage(LogLevel.DEBUG, getCausesMessage(msg), repositoryArguments, t);
}
}
@ -350,10 +355,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Throwable lastThrowable = findLastThrowable(os);
if (lastThrowable == null) {
logger.debug(componentMessage, arguments);
log(Level.DEBUG, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.DEBUG, componentMessage, arguments);
} else {
logger.debug(componentMessage, setFormattedThrowable(arguments, lastThrowable));
log(Level.DEBUG, componentMessage, setFormattedThrowable(arguments, lastThrowable));
logRepository.addLogMessage(LogLevel.DEBUG, getCausesMessage(msg), setCauses(arguments, lastThrowable), lastThrowable);
}
}
@ -366,10 +371,10 @@ public class SimpleProcessLogger implements ComponentLog {
final Object[] arguments = insertComponent(os);
if (t == null) {
logger.debug(componentMessage, arguments);
log(Level.DEBUG, componentMessage, arguments);
logRepository.addLogMessage(LogLevel.DEBUG, componentMessage, arguments);
} else {
logger.debug(componentMessage, addThrowable(arguments, t));
log(Level.DEBUG, componentMessage, addThrowable(arguments, t));
logRepository.addLogMessage(LogLevel.DEBUG, getCausesMessage(msg), addCauses(arguments, t), t);
}
}
@ -563,4 +568,18 @@ public class SimpleProcessLogger implements ComponentLog {
}
return lastThrowable;
}
private String getDiscriminatorKey() {
return loggingContext.getDiscriminatorKey();
}
private String getLogFileSuffix() {
return loggingContext.getLogFileSuffix().orElse(null);
}
private void log(final Level level, final String message, final Object... arguments) {
logger.makeLoggingEventBuilder(level)
.addKeyValue(getDiscriminatorKey(), getLogFileSuffix())
.log(message, arguments);
}
}

View File

@ -327,6 +327,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
group.setLogFileSuffix(contents.getLogFileSuffix());
coordinates.setLatest(snapshot.isLatest());
snapshotContainer.addChildSnapshot(snapshot, group);

View File

@ -258,6 +258,7 @@ public class NiFiRegistryFlowMapper {
versionedGroup.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration());
versionedGroup.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold());
versionedGroup.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold());
versionedGroup.setLogFileSuffix(group.getLogFileSuffix());
final ParameterContext parameterContext = group.getParameterContext();
versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName());

View File

@ -74,7 +74,8 @@ public class FlowDifferenceFilters {
|| isNewZIndexLabelConfigWithDefaultValue(difference, flowManager)
|| isNewZIndexConnectionConfigWithDefaultValue(difference, flowManager)
|| isRegistryUrlChange(difference)
|| isParameterContextChange(difference);
|| isParameterContextChange(difference)
|| isLogFileSuffixChange(difference);
}
private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
@ -529,4 +530,8 @@ public class FlowDifferenceFilters {
private static boolean isParameterContextChange(final FlowDifference flowDifference) {
return flowDifference.getDifferenceType() == DifferenceType.PARAMETER_CONTEXT_CHANGED;
}
private static boolean isLogFileSuffixChange(final FlowDifference flowDifference) {
return flowDifference.getDifferenceType() == DifferenceType.LOG_FILE_SUFFIX_CHANGED;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;
import org.apache.nifi.groups.ProcessGroup;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestStandardLoggingContext {
private static final String LOG_FILE_SUFFIX = "myGroup";
@Mock
private GroupedComponent processor;
@Mock
private ProcessGroup processGroup;
@Test
void testNullComponent_ShouldReturnOptionalEmpty() {
LoggingContext context = new StandardLoggingContext(null);
assertTrue(context.getLogFileSuffix().isEmpty());
}
@Test
void testComponentWithProcessGroups_WithoutPerProcessGroupLogging_ShouldReturnOptionalEmpty() {
//component with pg with no setting returns optional empty
LoggingContext context = new StandardLoggingContext(processor);
when(processor.getProcessGroup()).thenReturn(processGroup);
when(processGroup.getLogFileSuffix()).thenReturn(null, null);
when(processGroup.isRootGroup()).thenReturn(Boolean.FALSE, Boolean.TRUE);
when(processGroup.getParent()).thenReturn(processGroup);
assertTrue(context.getLogFileSuffix().isEmpty());
}
@Test
void testComponentWithProcessGroup_WithPerProcessGroupLogging_ShouldReturnLogFileSuffix() {
LoggingContext context = new StandardLoggingContext(processor);
when(processor.getProcessGroup()).thenReturn(processGroup);
when(processGroup.getLogFileSuffix()).thenReturn(LOG_FILE_SUFFIX);
assertEquals(LOG_FILE_SUFFIX, context.getLogFileSuffix().orElse(null));
}
@Test
void testComponentWithProcessGroups_WithPerProcessGroupLoggingSetOnParent_ShouldReturnLogFileSuffix() {
LoggingContext context = new StandardLoggingContext(processor);
when(processor.getProcessGroup()).thenReturn(processGroup);
when(processGroup.isRootGroup()).thenReturn(Boolean.FALSE);
when(processGroup.getParent()).thenReturn(processGroup);
when(processGroup.getLogFileSuffix()).thenReturn(null, LOG_FILE_SUFFIX);
assertEquals(LOG_FILE_SUFFIX, context.getLogFileSuffix().orElse(null));
}
}

View File

@ -20,14 +20,25 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.StandardLoggingContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -70,15 +81,25 @@ public class TestSimpleProcessLogger {
private static final String LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT_AND_CAUSES = String.format("{} %s: {}", LOG_ARGUMENTS_MESSAGE);
private static final String DISCRIMINATOR_KEY = "logFileSuffix";
private static final String LOG_FILE_SUFFIX = "myGroup";
@Mock
private ConfigurableComponent component;
@Mock
private LogRepository logRepository;
@Mock
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Logger logger;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
LoggingEventBuilder loggingEventBuilder;
@Mock
StandardLoggingContext loggingContext;
private Object[] componentArguments;
private Object[] componentValueArguments;
@ -89,9 +110,11 @@ public class TestSimpleProcessLogger {
private SimpleProcessLogger componentLog;
private final ArgumentCaptor<Object[]> argumentCaptor = ArgumentCaptor.forClass(Object[].class);
@BeforeEach
public void setLogger() throws IllegalAccessException {
componentLog = new SimpleProcessLogger(component, logRepository);
componentLog = new SimpleProcessLogger(component, logRepository, loggingContext);
FieldUtils.writeDeclaredField(componentLog, "logger", logger, true);
componentArguments = new Object[]{component};
@ -104,6 +127,10 @@ public class TestSimpleProcessLogger {
when(logger.isInfoEnabled()).thenReturn(true);
when(logger.isWarnEnabled()).thenReturn(true);
when(logger.isErrorEnabled()).thenReturn(true);
when(logger.makeLoggingEventBuilder(any(Level.class))).thenReturn(loggingEventBuilder);
when(loggingContext.getDiscriminatorKey()).thenReturn(DISCRIMINATOR_KEY);
when(loggingContext.getLogFileSuffix()).thenReturn(Optional.of(LOG_FILE_SUFFIX));
when(loggingEventBuilder.addKeyValue(any(String.class), any(String.class))).thenReturn(loggingEventBuilder);
}
@Test
@ -113,19 +140,69 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(1, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(1, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(1, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(1, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(1, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
reset(loggingEventBuilder);
break;
default:
continue;
@ -142,19 +219,79 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
default:
continue;
@ -171,19 +308,74 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(2, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[1]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(2, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[1]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(2, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[1]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(2, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[1]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_MESSAGE_WITH_COMPONENT), eq(component), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(2, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[1]);
reset(loggingEventBuilder);
break;
default:
continue;
@ -200,19 +392,89 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION_STRING), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(5, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION_STRING, argumentCaptor.getValue()[3]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[4]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION_STRING), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(5, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION_STRING, argumentCaptor.getValue()[3]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[4]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION_STRING), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(5, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION_STRING, argumentCaptor.getValue()[3]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[4]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION_STRING), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(5, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION_STRING, argumentCaptor.getValue()[3]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[4]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION_STRING), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(5, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION_STRING, argumentCaptor.getValue()[3]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[4]);
reset(loggingEventBuilder);
break;
default:
continue;
@ -229,19 +491,84 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(4, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[3]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(4, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[3]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(4, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[3]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(4, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[3]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND), eq(EXCEPTION));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(4, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
assertEquals(EXCEPTION, argumentCaptor.getValue()[3]);
reset(loggingEventBuilder);
break;
default:
continue;
@ -258,19 +585,69 @@ public class TestSimpleProcessLogger {
switch (logLevel) {
case TRACE:
verify(logger).trace(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.TRACE));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case DEBUG:
verify(logger).debug(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.DEBUG));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case INFO:
verify(logger).info(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.INFO));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case WARN:
verify(logger).warn(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.WARN));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
case ERROR:
verify(logger).error(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), eq(component), eq(FIRST), eq(SECOND));
verify(logger, times(1)).makeLoggingEventBuilder(eq(Level.ERROR));
verify(loggingEventBuilder, times(1))
.addKeyValue(eq(DISCRIMINATOR_KEY), eq(LOG_FILE_SUFFIX));
verify(loggingEventBuilder
.addKeyValue(DISCRIMINATOR_KEY, LOG_FILE_SUFFIX), times(1))
.log(eq(LOG_ARGUMENTS_MESSAGE_WITH_COMPONENT), argumentCaptor.capture());
assertEquals(3, argumentCaptor.getValue().length);
assertEquals(component, argumentCaptor.getValue()[0]);
assertEquals(FIRST, argumentCaptor.getValue()[1]);
assertEquals(SECOND, argumentCaptor.getValue()[2]);
reset(loggingEventBuilder);
break;
default:
continue;

View File

@ -22,6 +22,7 @@ import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.GroupedComponent;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
/**
* Represents a connectable component to which or from which data can flow.
*/
public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable, VersionedComponent {
public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable, VersionedComponent, GroupedComponent {
/**
* @return the unique identifier for this <code>Connectable</code>

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.LoggableComponent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.GroupedComponent;
import org.apache.nifi.nar.ExtensionManager;
import java.util.List;
@ -36,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public interface ControllerServiceNode extends ComponentNode, VersionedComponent {
public interface ControllerServiceNode extends ComponentNode, VersionedComponent, GroupedComponent {
/**
* @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service

View File

@ -1233,4 +1233,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @return the QueueSize of this Process Group and all child Process Groups
*/
QueueSize getQueueSize();
/**
* @return the log file suffix of the ProcessGroup for dedicated logging
*/
String getLogFileSuffix();
/**
* Updates the log file suffix of this ProcessGroup for dedicated logging
*
* @param logFileSuffix new log file suffix
*/
void setLogFileSuffix(String logFileSuffix);
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;
import org.apache.nifi.groups.ProcessGroup;
public interface GroupedComponent {
ProcessGroup getProcessGroup();
}

View File

@ -44,6 +44,8 @@ import org.apache.nifi.controller.service.StandardControllerServiceInitializatio
import org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LoggingContext;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.PythonBundle;
@ -228,9 +230,10 @@ public class ExtensionBuilder {
}
boolean creationSuccessful = true;
final StandardLoggingContext loggingContext = new StandardLoggingContext(null);
LoggableComponent<Processor> loggableComponent;
try {
loggableComponent = createLoggableProcessor();
loggableComponent = createLoggableProcessor(loggingContext);
} catch (final ProcessorInstantiationException pie) {
logger.error("Could not create Processor of type {} for ID {} due to: {}; creating \"Ghost\" implementation", type, identifier, pie.getMessage(), pie);
@ -252,6 +255,7 @@ public class ExtensionBuilder {
}
final ProcessorNode processorNode = createProcessorNode(loggableComponent, componentType, !creationSuccessful);
loggingContext.setComponent(processorNode);
return processorNode;
}
@ -419,9 +423,9 @@ public class ExtensionBuilder {
if (stateManagerProvider == null) {
throw new IllegalStateException("State Manager Provider must be specified");
}
final StandardLoggingContext loggingContext = new StandardLoggingContext(null);
try {
return createControllerServiceNode();
return createControllerServiceNode(loggingContext);
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + " due to: " + e.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
@ -561,7 +565,8 @@ public class ExtensionBuilder {
}
}
private ControllerServiceNode createControllerServiceNode() throws ClassNotFoundException, IllegalAccessException, InstantiationException, InitializationException {
private ControllerServiceNode createControllerServiceNode(final StandardLoggingContext loggingContext)
throws ClassNotFoundException, IllegalAccessException, InstantiationException, InitializationException {
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
@ -590,7 +595,7 @@ public class ExtensionBuilder {
}
logger.info("Created Controller Service of type {} with identifier {}", type, identifier);
final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl);
final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger);
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
@ -608,6 +613,7 @@ public class ExtensionBuilder {
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
serviceNode.setName(rawClass.getSimpleName());
loggingContext.setComponent(serviceNode);
invocationHandler.setServiceNode(serviceNode);
return serviceNode;
@ -697,13 +703,13 @@ public class ExtensionBuilder {
return serviceNode;
}
private LoggableComponent<Processor> createLoggableProcessor() throws ProcessorInstantiationException {
private LoggableComponent<Processor> createLoggableProcessor(final LoggingContext loggingContext) throws ProcessorInstantiationException {
try {
final LoggableComponent<Processor> processorComponent;
if (PythonBundle.isPythonCoordinate(bundleCoordinate)) {
processorComponent = createLoggablePythonProcessor();
} else {
processorComponent = createLoggableComponent(Processor.class);
processorComponent = createLoggableComponent(Processor.class, loggingContext);
}
final Processor processor = processorComponent.getComponent();
@ -724,7 +730,7 @@ public class ExtensionBuilder {
private LoggableComponent<ReportingTask> createLoggableReportingTask() throws ReportingTaskInstantiationException {
try {
final LoggableComponent<ReportingTask> taskComponent = createLoggableComponent(ReportingTask.class);
final LoggableComponent<ReportingTask> taskComponent = createLoggableComponent(ReportingTask.class, new StandardLoggingContext(null));
final String taskName = taskComponent.getComponent().getClass().getSimpleName();
final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
@ -743,7 +749,7 @@ public class ExtensionBuilder {
private LoggableComponent<FlowRegistryClient> createLoggableFlowRegistryClient() throws FlowRepositoryClientInstantiationException {
try {
final LoggableComponent<FlowRegistryClient> clientComponent = createLoggableComponent(FlowRegistryClient.class);
final LoggableComponent<FlowRegistryClient> clientComponent = createLoggableComponent(FlowRegistryClient.class, new StandardLoggingContext(null));
final FlowRegistryClientInitializationContext context = new StandardFlowRegistryClientInitializationContext(
identifier, clientComponent.getLogger(), systemSslContext);
@ -759,7 +765,7 @@ public class ExtensionBuilder {
private LoggableComponent<ParameterProvider> createLoggableParameterProvider() throws ParameterProviderInstantiationException {
try {
final LoggableComponent<ParameterProvider> providerComponent = createLoggableComponent(ParameterProvider.class);
final LoggableComponent<ParameterProvider> providerComponent = createLoggableComponent(ParameterProvider.class, new StandardLoggingContext(null));
final String taskName = providerComponent.getComponent().getClass().getSimpleName();
final ParameterProviderInitializationContext config = new StandardParameterProviderInitializationContext(identifier, taskName,
@ -793,7 +799,7 @@ public class ExtensionBuilder {
final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);
final Processor processor = processorBridge.getProcessorProxy();
final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor);
final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
@ -817,7 +823,8 @@ public class ExtensionBuilder {
}
}
private <T extends ConfigurableComponent> LoggableComponent<T> createLoggableComponent(Class<T> nodeType) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
private <T extends ConfigurableComponent> LoggableComponent<T> createLoggableComponent(Class<T> nodeType, LoggingContext loggingContext)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
@ -832,7 +839,7 @@ public class ExtensionBuilder {
final Object extensionInstance = rawClass.newInstance();
final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance);
final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance, loggingContext);
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
final T cast = nodeType.cast(extensionInstance);

View File

@ -533,6 +533,11 @@ public class StandardFlowSnippet implements FlowSnippet {
childGroup.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
}
final String logFileSuffix = groupDTO.getLogFileSuffix();
if (logFileSuffix != null) {
childGroup.setLogFileSuffix(logFileSuffix);
}
// If this Process Group is 'top level' then we do not set versioned component ID's.
// We do this only if this component is the child of a Versioned Component.
if (!topLevel) {

View File

@ -32,6 +32,7 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@ -93,7 +94,7 @@ public class StandardReloadComponent implements ReloadComponent {
final ProcessorNode newNode = flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
// set the new processor in the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor(), new StandardLoggingContext(newNode));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -152,7 +153,7 @@ public class StandardReloadComponent implements ReloadComponent {
invocationHandler.setServiceNode(existingNode);
// create LoggableComponents for the proxy and implementation
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation());
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation(), new StandardLoggingContext(newNode));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -203,7 +204,7 @@ public class StandardReloadComponent implements ReloadComponent {
final ReportingTaskNode newNode = flowController.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
// set the new reporting task into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -250,7 +251,7 @@ public class StandardReloadComponent implements ReloadComponent {
}
// set the new parameter provider into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getParameterProvider());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getParameterProvider(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -291,7 +292,7 @@ public class StandardReloadComponent implements ReloadComponent {
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
// set the new flow registyr client into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getComponent());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getComponent(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);

View File

@ -1292,6 +1292,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
final String defaultFlowFileExpiration = dto.getDefaultFlowFileExpiration();
final Long defaultBackPressureObjectThreshold = dto.getDefaultBackPressureObjectThreshold();
final String defaultBackPressureDataSizeThreshold = dto.getDefaultBackPressureDataSizeThreshold();
final String logFileSuffix = dto.getLogFileSuffix();
if (name != null) {
group.setName(name);
@ -1333,6 +1334,10 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
if (defaultBackPressureDataSizeThreshold != null) {
group.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
}
if (logFileSuffix != null) {
group.setLogFileSuffix(logFileSuffix);
}
}
private <T extends Connectable & Triggerable> ScheduledState getScheduledState(final T component, final FlowController flowController) {
@ -1468,6 +1473,8 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
processGroup.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
processGroup.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
processGroup.setLogFileSuffix(processGroupDTO.getLogFileSuffix());
final String parameterContextId = getString(processGroupElement, "parameterContextId");
if (parameterContextId != null) {
final ParameterContext parameterContext = controller.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId);

View File

@ -42,6 +42,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
@ -363,11 +364,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(extensionManager, worker.getProcessor().getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor(), new StandardLoggingContext(worker));
procLog.error("Failed to process session due to {}", new Object[]{pe});
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor(), new StandardLoggingContext(worker));
procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t});
procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{adminYieldDuration});
logger.warn("Administratively Yielding {} due to uncaught Exception: ", worker.getProcessor());

View File

@ -43,6 +43,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
@ -244,7 +245,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask, new StandardLoggingContext(null));
componentLog.error("Failed to invoke @OnScheduled method due to {}", cause);
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
@ -297,7 +298,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask, new StandardLoggingContext(null));
componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",

View File

@ -296,6 +296,7 @@ public class FlowFromDOMFactory {
dto.setDefaultFlowFileExpiration(getString(element, "defaultFlowFileExpiration"));
dto.setDefaultBackPressureObjectThreshold(getLong(element, "defaultBackPressureObjectThreshold"));
dto.setDefaultBackPressureDataSizeThreshold(getString(element, "defaultBackPressureDataSizeThreshold"));
dto.setLogFileSuffix(getString(element, "logFileSuffix"));
final Map<String, String> variables = new HashMap<>();
final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");

View File

@ -256,6 +256,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addTextElement(element, "defaultFlowFileExpiration", group.getDefaultFlowFileExpiration());
addTextElement(element, "defaultBackPressureObjectThreshold", group.getDefaultBackPressureObjectThreshold());
addTextElement(element, "defaultBackPressureDataSizeThreshold", group.getDefaultBackPressureDataSizeThreshold());
addTextElement(element, "logFileSuffix", group.getLogFileSuffix());
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {

View File

@ -40,6 +40,7 @@ import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
@ -286,7 +287,7 @@ public class ConnectableTask {
} finally {
try {
if (batch) {
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent(), new StandardLoggingContext(connectable));
try {
rawSession.commitAsync(null, t -> {
@ -377,7 +378,7 @@ public class ConnectableTask {
}
private ComponentLog getComponentLog() {
return new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
return new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent(), new StandardLoggingContext(connectable));
}
private static class SampledMetrics {

View File

@ -23,6 +23,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.util.ReflectionUtils;
public class ReportingTaskWrapper implements Runnable {
@ -45,7 +46,7 @@ public class ReportingTaskWrapper implements Runnable {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
} catch (final Throwable t) {
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask(), new StandardLoggingContext(null));
componentLog.error("Error running task {} due to {}", new Object[]{taskNode.getReportingTask(), t.toString()});
if (componentLog.isDebugEnabled()) {
componentLog.error("", t);

View File

@ -392,6 +392,7 @@ public class FingerprintFactory {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultFlowFileExpiration"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureObjectThreshold"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureDataSizeThreshold"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "logFileSuffix"));
final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
if (versionControlInfo == null) {

View File

@ -207,6 +207,7 @@
<xs:element name="defaultBackPressureObjectThreshold" type="xs:long" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureDataSizeThreshold" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="versionControlInformation" type="VersionControlInformation" minOccurs="0" maxOccurs="1" />
<xs:element name="logFileSuffix" type="xs:string" minOccurs="0" maxOccurs="1"/>
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>
@ -273,6 +274,7 @@
<xs:element name="defaultFlowFileExpiration" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureObjectThreshold" type="xs:long" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureDataSizeThreshold" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="logFileSuffix" type="xs:string" minOccurs="0" maxOccurs="1"/>
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>

View File

@ -858,6 +858,16 @@ public class MockProcessGroup implements ProcessGroup {
return null;
}
@Override
public String getLogFileSuffix() {
return null;
}
@Override
public void setLogFileSuffix(String logFileSuffix) {
}
@Override
public void terminateProcessor(ProcessorNode processor) {
}

View File

@ -101,6 +101,31 @@
</encoder>
</appender>
<appender name="DEDICATED_LOGGING" class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator class="org.apache.nifi.logging.NifiDiscriminator"/>
<sift>
<appender name="APP-${logFileSuffix}_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app-${logFileSuffix}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--
For daily rollover, use 'app_%d.log'.
For hourly rollover, use 'app_%d{yyyy-MM-dd_HH}.log'.
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app-${logFileSuffix}_%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<!-- keep 30 log files worth of history -->
<maxHistory>30</maxHistory>
</rollingPolicy>
<immediateFlush>true</immediateFlush>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
</sift>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
@ -226,4 +251,8 @@
<appender-ref ref="APP_FILE" />
</root>
<root level="INFO">
<appender-ref ref="DEDICATED_LOGGING" />
</root>
</configuration>

View File

@ -2686,6 +2686,7 @@ public final class DtoFactory {
dto.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration());
dto.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold());
dto.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold());
dto.setLogFileSuffix(group.getLogFileSuffix());
final ParameterContext parameterContext = group.getParameterContext();
if (parameterContext != null) {
@ -4599,6 +4600,7 @@ public final class DtoFactory {
copy.setDefaultFlowFileExpiration(original.getDefaultFlowFileExpiration());
copy.setDefaultBackPressureObjectThreshold(original.getDefaultBackPressureObjectThreshold());
copy.setDefaultBackPressureDataSizeThreshold(original.getDefaultBackPressureDataSizeThreshold());
copy.setLogFileSuffix(original.getLogFileSuffix());
copy.setRunningCount(original.getRunningCount());
copy.setStoppedCount(original.getStoppedCount());

View File

@ -40,6 +40,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
@ -413,7 +414,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
final ControllerServiceNode serviceNode = locateControllerService(controllerServiceId);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(serviceNode.getControllerServiceImplementation(), logRepository);
final ComponentLog configVerificationLog = new SimpleProcessLogger(serviceNode.getControllerServiceImplementation(), logRepository, new StandardLoggingContext(serviceNode));
final ExtensionManager extensionManager = flowController.getExtensionManager();
final ParameterLookup parameterLookup = serviceNode.getProcessGroup() == null ? ParameterLookup.EMPTY : serviceNode.getProcessGroup().getParameterContext();

View File

@ -36,6 +36,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterGroupConfiguration;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
@ -233,7 +234,7 @@ public class StandardParameterProviderDAO extends ComponentDAO implements Parame
final ParameterProviderNode parameterProviderNode = locateParameterProvider(parameterProviderId);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(parameterProviderNode.getParameterProvider(), logRepository);
final ComponentLog configVerificationLog = new SimpleProcessLogger(parameterProviderNode.getParameterProvider(), logRepository, new StandardLoggingContext(null));
final ExtensionManager extensionManager = flowController.getExtensionManager();
final ParameterLookup parameterLookup = ParameterLookup.EMPTY;

View File

@ -347,6 +347,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final Long defaultBackPressureObjectThreshold = processGroupDTO.getDefaultBackPressureObjectThreshold();
final String defaultBackPressureDataSizeThreshold = processGroupDTO.getDefaultBackPressureDataSizeThreshold();
final String logFileSuffix = processGroupDTO.getLogFileSuffix();
final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext();
if (parameterContextReference != null) {
final String parameterContextId = parameterContextReference.getId();
@ -392,6 +394,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
group.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
}
if (logFileSuffix != null) {
group.setLogFileSuffix(logFileSuffix);
}
group.onComponentModified();
return group;
}

View File

@ -35,6 +35,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.components.ConfigVerificationResult;
@ -482,7 +483,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
new NopStateManager(), () -> false, flowController);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(processor, logRepository);
final ComponentLog configVerificationLog = new SimpleProcessLogger(processor, logRepository, new StandardLoggingContext(processor));
final ExtensionManager extensionManager = flowController.getExtensionManager();
final List<ConfigVerificationResult> verificationResults = processor.verifyConfiguration(processContext, configVerificationLog, attributes, extensionManager);

View File

@ -38,6 +38,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FormatUtils;
@ -266,7 +267,7 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
final ReportingTaskNode taskNode = locateReportingTask(reportingTaskId);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(taskNode.getReportingTask(), logRepository);
final ComponentLog configVerificationLog = new SimpleProcessLogger(taskNode.getReportingTask(), logRepository, new StandardLoggingContext(null));
final ExtensionManager extensionManager = flowController.getExtensionManager();
final ParameterLookup parameterLookup = ParameterLookup.EMPTY;

View File

@ -461,6 +461,28 @@ public final class SnippetUtils {
groupNames.add(groupDTO.getName());
}
}
// get a list of all log file suffix
final List<String> existingLogFileSuffixes = new ArrayList<>();
for (final ProcessGroup processGroup : group.getProcessGroups()) {
if (processGroup.getLogFileSuffix() != null) {
existingLogFileSuffixes.add(processGroup.getLogFileSuffix());
}
}
// rename log file suffixes
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO processGroupDTO : snippetContents.getProcessGroups()) {
String logFileSuffix = processGroupDTO.getLogFileSuffix();
if (logFileSuffix != null) {
while (existingLogFileSuffixes.contains(logFileSuffix)) {
logFileSuffix = "Copy_of_" + logFileSuffix;
}
processGroupDTO.setLogFileSuffix(logFileSuffix);
existingLogFileSuffixes.add(processGroupDTO.getLogFileSuffix());
}
}
}
}
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap,

View File

@ -99,6 +99,17 @@
<span id="read-only-process-group-default-back-pressure-data-size-threshold" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Log File Suffix
<div class="fa fa-question-circle" alt="Info" title="Turns on dedicated logging. When left empty log messages will be logged only to the primary app log. When set messages logged by components in this group will be sent to the standard app log, as well as a separate log file, with the provided name, specific to this group."></div>
</div>
<div class="editable setting-field">
<input type="text" id="process-group-log-file-suffix" class="setting-input"/>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-log-file-suffix" class="unset"></span>
</div>
</div>
<div class="editable settings-buttons">
<div id="process-group-configuration-save" class="button">Apply</div>

View File

@ -119,4 +119,8 @@
#upload-process-group-link {
float: right;
}
#process-group-log-to-own-file-combo {
width: 328px;
}

View File

@ -111,7 +111,8 @@
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value,
'defaultFlowFileExpiration': $('#process-group-default-flowfile-expiration').val(),
'defaultBackPressureObjectThreshold': $('#process-group-default-back-pressure-object-threshold').val(),
'defaultBackPressureDataSizeThreshold': $('#process-group-default-back-pressure-data-size-threshold').val()
'defaultBackPressureDataSizeThreshold': $('#process-group-default-back-pressure-data-size-threshold').val(),
'logFileSuffix': $('#process-group-log-file-suffix').val()
}
};
@ -271,7 +272,7 @@
$('#process-group-default-flowfile-expiration').removeClass('unset').val(processGroup.defaultFlowFileExpiration);
$('#process-group-default-back-pressure-object-threshold').removeClass('unset').val(processGroup.defaultBackPressureObjectThreshold);
$('#process-group-default-back-pressure-data-size-threshold').removeClass('unset').val(processGroup.defaultBackPressureDataSizeThreshold);
$('#process-group-log-file-suffix').removeClass('unset').val(processGroup.logFileSuffix);
// populate the header
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
@ -316,6 +317,9 @@
$('#read-only-process-group-default-flowfile-expiration').text(processGroup.defaultFlowFileExpiration);
$('#read-only-process-group-default-back-pressure-object-threshold').text(processGroup.defaultBackPressureObjectThreshold);
$('#read-only-process-group-default-back-pressure-data-size-threshold').text(processGroup.defaultBackPressureDataSizeThreshold);
$('#read-only-process-group-log-file-suffix').text(processGroup.logFileSuffix);
} else {
setUnauthorizedText();
}

View File

@ -368,7 +368,12 @@ public enum DifferenceType {
/**
* The Process Group's Default value for connections' FlowFile Expiration is different in each of the flows
*/
DEFAULT_FLOWFILE_EXPIRATION_CHANGED("Default FlowFile Expiration Changed")
DEFAULT_FLOWFILE_EXPIRATION_CHANGED("Default FlowFile Expiration Changed"),
/**
* The Process Group's Log File Suffix value is different in each of the flows
*/
LOG_FILE_SUFFIX_CHANGED("Log File Suffix Changed")
;
private final String description;

View File

@ -522,6 +522,7 @@ public class StandardFlowComparator implements FlowComparator {
addIfDifferent(differences, DifferenceType.DEFAULT_BACKPRESSURE_OBJECT_COUNT_CHANGED, groupA, groupB, VersionedProcessGroup::getDefaultBackPressureObjectThreshold, true, 10_000L);
addIfDifferent(differences, DifferenceType.DEFAULT_FLOWFILE_EXPIRATION_CHANGED, groupA, groupB, VersionedProcessGroup::getDefaultFlowFileExpiration, true, "0 sec");
addIfDifferent(differences, DifferenceType.PARAMETER_CONTEXT_CHANGED, groupA, groupB, VersionedProcessGroup::getParameterContextName, true, null);
addIfDifferent(differences, DifferenceType.LOG_FILE_SUFFIX_CHANGED, groupA, groupB, VersionedProcessGroup::getLogFileSuffix, true, null);
final VersionedFlowCoordinates groupACoordinates = groupA.getVersionedFlowCoordinates();
final VersionedFlowCoordinates groupBCoordinates = groupB.getVersionedFlowCoordinates();

View File

@ -155,6 +155,7 @@ public class RegistryUtil {
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
group.setLogFileSuffix(contents.getLogFileSuffix());
coordinates.setLatest(snapshot.isLatest());
}

View File

@ -52,6 +52,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceInitializatio
import org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.parameter.ParameterProviderInitializationContext;
@ -165,7 +166,7 @@ public class ComponentBuilder {
private LoggableComponent<FlowRegistryClient> createLoggableFlowRegistryClient() throws FlowRepositoryClientInstantiationException {
try {
final ComponentLog componentLog = new SimpleProcessLogger(identifier, InMemoryFlowRegistry.class.newInstance());
final ComponentLog componentLog = new SimpleProcessLogger(identifier, InMemoryFlowRegistry.class.newInstance(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
final InMemoryFlowRegistry registryClient = new InMemoryFlowRegistry();
final LoggableComponent<FlowRegistryClient> nodeComponent = new LoggableComponent<>(registryClient, bundleCoordinate, terminationAwareLogger);
@ -288,7 +289,7 @@ public class ComponentBuilder {
}
logger.info("Created Controller Service of type {} with identifier {}", type, identifier);
final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl);
final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger);
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
@ -352,7 +353,7 @@ public class ComponentBuilder {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
final Object extensionInstance = rawClass.newInstance();
final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance);
final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
final T cast = nodeType.cast(extensionInstance);

View File

@ -41,6 +41,7 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@ -94,7 +95,7 @@ public class StatelessReloadComponent implements ReloadComponent {
}
// set the new processor in the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -154,7 +155,7 @@ public class StatelessReloadComponent implements ReloadComponent {
invocationHandler.setServiceNode(existingNode);
// create LoggableComponents for the proxy and implementation
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation());
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -202,7 +203,7 @@ public class StatelessReloadComponent implements ReloadComponent {
}
// set the new reporting task into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -247,7 +248,7 @@ public class StatelessReloadComponent implements ReloadComponent {
}
// set the new reporting task into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getParameterProvider());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getParameterProvider(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
@ -285,7 +286,7 @@ public class StatelessReloadComponent implements ReloadComponent {
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
// set the new flow registry client into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getComponent());
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getComponent(), new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);

View File

@ -25,6 +25,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -92,7 +93,7 @@ public class StatelessSchedulingAgent implements SchedulingAgent {
}
} catch (final Throwable t) {
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask(), new StandardLoggingContext(null));
componentLog.error("Error running task {} due to {}", new Object[]{taskNode.getReportingTask(), t.toString()});
if (componentLog.isDebugEnabled()) {
componentLog.error("", t);